Warp data update loop: easy how to

What does “Rust Warp data update loop” mean? This article will describe how to create an infinite loop that sends data to clients connected to your WebSocket server in a thread separate from the main thread. This is useful when you want to build a system that periodically retrieves data from one or more sources and then needs to actively push that data to your client application. For example, a frontend dashboard that displays the data.

The completed code project can be found on my GitHub, click here.

This article will use the project from the following article: Rust Warp WebSocket server: learn how to as a base.

A follow up article uses the result of this article’s project to send cryptocurrency trade data to a connected client: Crypto triangle arbitrage dashboard: how to, part 1.

Prerequisites

The completed Rust Warp WebSocket server project is a requirement to follow this article. Some knowledge of Rust programming is recommended to be able to follow along easily, though, the code being discussed is straightforward.

Creating and running a basic loop in a separate thread

First, we will start with a basic skeleton and create a loop that does not yet send data to the connected clients. The only thing the code will do is print the number of connected clients.

Creating the update loop in workers.rs

We will create a new module called workers.rs to hold the code for our data update loop. For now, the loop will simply check the number of connected clients being tracked by clients and print a different message accordingly. The code listing for workers.rs is shown below:

use crate::Clients;
use tokio;
use tokio::time::Duration;

pub async fn main_worker(clients: Clients) {
    loop {
        tokio::time::sleep(Duration::from_millis(2000)).await;

        let connected_client_count = clients.lock().await.len();
        if connected_client_count == 0 {
            println!("No clients connected, skip sending data");
            continue;
        }
        println!("{} connected client(s)", connected_client_count);
    }
}

Here we have created an async function main_worker that contains an infinite loop. The loop updates every 2 seconds. We do a slow update for testing purposes. However, there can be situations where we want to update clients in real-time without any delays.

We retrieve a lock on the HashMap of clients and then get the length, or number of entries, using the len() function. If the connected_client_count is zero we do not have to do any further processing so we continue; the loop from the beginning, otherwise we print a message showing the number of connected clients.

Next, we will update main.rs to call this code and run it in a separate task, which is an asynchronous green thread.

Updating main.rs

Finally, we are going to update the main.rs from the previous Rust WebSocket server project to spawn a tokio task that will call the function main_worker from our workers.rs module containing the infinite loop. The updated lines are highlighted in the code listing below:

use std::{collections::HashMap, convert::Infallible, sync::Arc};
use tokio::sync::{mpsc, Mutex};
use warp::{ws::Message, Filter, Rejection};

mod handlers;
mod workers;
mod ws;

#[derive(Debug, Clone)]
pub struct Client {
    pub client_id: String,
    pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
}

type Clients = Arc<Mutex<HashMap<String, Client>>>;
type Result<T> = std::result::Result<T, Rejection>;

#[tokio::main]
async fn main() {
    let clients: Clients = Arc::new(Mutex::new(HashMap::new()));

    println!("Configuring websocket route");
    let ws_route = warp::path("ws")
        .and(warp::ws())
        .and(with_clients(clients.clone()))
        .and_then(handlers::ws_handler);

    let routes = ws_route.with(warp::cors().allow_any_origin());

    println!("Starting update loop");
    tokio::task::spawn(async move {
        workers::main_worker(clients.clone()).await;
    });
    println!("Starting server");
    warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone {
    warp::any().map(move || clients.clone())
}

As mentioned earlier we spawn a task to create a new green thread using tokio::task::spawn on line 31. Furthermore, we use move to move variables into the closure, in this case, the clients variable. The task then calls main_worker with a clone of clients as parameter.

We call spawn on the task, because calling spawn enables the task to run concurrently with other tasks. This means that, the main_workers function, or whatever else is inside the closure, will run alongside the main warp server thread at the same time.

Running the Warp server with data update loop

To run the code we simply use the cargo run command like that. After the server starts we can connect using a WebSocket client. For example “WebSocket Client”.

Wait a few seconds and the terminal output will look something like this:

Configuring websocket route
Starting update loop
Starting server
No clients connected, skip sending data
No clients connected, skip sending data
No clients connected, skip sending data

And then after connecting with a WebSocket client:

Configuring websocket route
Starting update loop
Starting server
No clients connected, skip sending data
No clients connected, skip sending data
ws_handler
establishing client connection... WebSocket
1 connected client(s)
1 connected client(s)
1 connected client(s)

Now that we have a working loop we will add some code to the main_worker function to generate some data and send it to all connected clients.

Generating and sending data to clients

In this section, we will add some crates and code for randomly generating some data. We will send that data to clients connected to our WebSocket server.

Updating dependencies

The crates we will add are:

  • chrono: Date and time library for Rust.
  • rand: Random number generators and other randomness functionality.

Resulting in the addition of the following lines to the dependencies in Cargo.toml:

chrono = { version = "0.4", features = ["serde"] }
rand = "0.8"

Updating workers.rs

First, we should update the imports to include the things we will be using for generating the data (chrono and rand) and for serializing the data we use serde::Serialize. Furthermore, we also need to import Message from the warp crate for when we want to convert data to a format for sending to a connected client:

use crate::Clients;
use chrono::{DateTime, Utc};
use rand::prelude::*;
use serde::Serialize;
use tokio;
use tokio::time::Duration;
use warp::ws::Message;

Let’s add a struct to represent our test data:

#[derive(Serialize)]
struct TestData {
    widget_type: String,
    widget_count: u32,
    creation_date: DateTime<Utc>,
}

We add the #[derive(Serialize)] attribute here to let the serde library take care of serialization for us. The struct itself doesn’t matter. This struct is just for testing purposes and contains three arbitrary fields we will generate values for on the fly using the following function:

fn generate_random_data() -> Vec<TestData> {
    let mut rng = rand::thread_rng();
    let mut test_data_batch = Vec::new();
    for i in 0..10 {
        test_data_batch.push(TestData {
            widget_type: String::from(format!("widget{}", i)),
            widget_count: rng.gen_range(0..100),
            creation_date: Utc::now(),
        });
    }
    return test_data_batch;
}

Here we create an instance of a system seeded random number generator with: let mut rng = rand::thread_rng();. Next, we create a new vector, and then loop 10 times to create 10 TestData entries in the vector to send to the clients.

Now we are ready to call this function and send the resulting list (vector) to the clients by updating the main_worker function. The updated lines are highlighted:

pub async fn main_worker(clients: Clients) {
    loop {
        tokio::time::sleep(Duration::from_millis(2000)).await;

        let connected_client_count = clients.lock().await.len();
        if connected_client_count == 0 {
            println!("No clients connected, skip sending data");
            continue;
        }
        println!("{} connected client(s)", connected_client_count);

        let test_data_batch = generate_random_data();
        clients.lock().await.iter().for_each(|(_, client)| {
            if let Some(sender) = &client.sender {
                let _ = sender.send(Ok(Message::text(
                    serde_json::to_string(&test_data_batch).unwrap(),
                )));
            }
        });
    }
}

On line 27 we call the generate_random_data() function we just wrote. Then, we gain a lock on the clients list and then iterate through each item. In the for each we get a tuple like (index, item), we’re not using the index so we put a underscore (_) there in the tuple, the underscore represents an ignored pattern binding. Then, we get a reference to the sender from the client, and use that to send our serialized data as text.

Running the Warp data update loop to send data

When we run the server and connect now, we should see JSON data coming into our connected WebSocket client. For example, in the JSON tab of the WebSocket Client software we can see what the server is sending:

warp data update loop result

Conclusion: Warp data update loop completed

We have learned how to run an extra thread next to the main thread running the server. Next to that, we added a data update loop to our Warp server from the previous article, which gives us the ability to continuously update connected clients with new data. We also briefly went over how to create a serializable data structure to represent the data we are sending.

The full completed project can be found on my GitHub, here.

In the next article, we will gather data on cryptocurrency in a loop and then send that data to clients. If you’re interested the article can be found at this link: Crypto triangle arbitrage dashboard: how to, part 1.

Leave a Reply

Your email address will not be published. Required fields are marked *