What does “Rust Warp data update loop” mean? This article will describe how to create an infinite loop that sends data to clients connected to your WebSocket server in a thread separate from the main thread. This is useful when you want to build a system that periodically retrieves data from one or more sources and then needs to actively push that data to your client application. For example, a frontend dashboard that displays the data.
The completed code project can be found on my GitHub, click here.
This article will use the project from the following article: Rust Warp WebSocket server: learn how to as a base.
A follow up article uses the result of this article’s project to send cryptocurrency trade data to a connected client: Crypto triangle arbitrage dashboard: how to, part 1.
Prerequisites
The completed Rust Warp WebSocket server project is a requirement to follow this article. Some knowledge of Rust programming is recommended to be able to follow along easily, though, the code being discussed is straightforward.
Creating and running a basic loop in a separate thread
First, we will start with a basic skeleton and create a loop that does not yet send data to the connected clients. The only thing the code will do is print the number of connected clients.
Creating the update loop in workers.rs
We will create a new module called workers.rs
to hold the code for our data update loop. For now, the loop will simply check the number of connected clients being tracked by clients
and print a different message accordingly. The code listing for workers.rs
is shown below:
use crate::Clients; use tokio; use tokio::time::Duration; pub async fn main_worker(clients: Clients) { loop { tokio::time::sleep(Duration::from_millis(2000)).await; let connected_client_count = clients.lock().await.len(); if connected_client_count == 0 { println!("No clients connected, skip sending data"); continue; } println!("{} connected client(s)", connected_client_count); } }
Here we have created an async function main_worker
that contains an infinite loop. The loop updates every 2 seconds. We do a slow update for testing purposes. However, there can be situations where we want to update clients in real-time without any delays.
We retrieve a lock on the HashMap
of clients and then get the length, or number of entries, using the len()
function. If the connected_client_count
is zero we do not have to do any further processing so we continue;
the loop from the beginning, otherwise we print a message showing the number of connected clients.
Next, we will update main.rs
to call this code and run it in a separate task, which is an asynchronous green thread.
Updating main.rs
Finally, we are going to update the main.rs
from the previous Rust WebSocket server project to spawn a tokio
task
that will call the function main_worker
from our workers.rs
module containing the infinite loop. The updated lines are highlighted in the code listing below:
use std::{collections::HashMap, convert::Infallible, sync::Arc}; use tokio::sync::{mpsc, Mutex}; use warp::{ws::Message, Filter, Rejection}; mod handlers; mod workers; mod ws; #[derive(Debug, Clone)] pub struct Client { pub client_id: String, pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>, } type Clients = Arc<Mutex<HashMap<String, Client>>>; type Result<T> = std::result::Result<T, Rejection>; #[tokio::main] async fn main() { let clients: Clients = Arc::new(Mutex::new(HashMap::new())); println!("Configuring websocket route"); let ws_route = warp::path("ws") .and(warp::ws()) .and(with_clients(clients.clone())) .and_then(handlers::ws_handler); let routes = ws_route.with(warp::cors().allow_any_origin()); println!("Starting update loop"); tokio::task::spawn(async move { workers::main_worker(clients.clone()).await; }); println!("Starting server"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; } fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone { warp::any().map(move || clients.clone()) }
As mentioned earlier we spawn a task to create a new green thread using tokio::task::spawn
on line 31. Furthermore, we use move
to move variables into the closure, in this case, the clients
variable. The task then calls main_worker
with a clone of clients
as parameter.
We call spawn
on the task
, because calling spawn
enables the task
to run concurrently with other tasks. This means that, the main_workers
function, or whatever else is inside the closure, will run alongside the main warp server thread at the same time.
Running the Warp server with data update loop
To run the code we simply use the cargo run
command like that. After the server starts we can connect using a WebSocket client. For example “WebSocket Client”.
Wait a few seconds and the terminal output will look something like this:
Configuring websocket route Starting update loop Starting server No clients connected, skip sending data No clients connected, skip sending data No clients connected, skip sending data
And then after connecting with a WebSocket client:
Configuring websocket route Starting update loop Starting server No clients connected, skip sending data No clients connected, skip sending data ws_handler establishing client connection... WebSocket 1 connected client(s) 1 connected client(s) 1 connected client(s)
Now that we have a working loop we will add some code to the main_worker
function to generate some data and send it to all connected clients.
Generating and sending data to clients
In this section, we will add some crates and code for randomly generating some data. We will send that data to clients connected to our WebSocket server.
Updating dependencies
The crates we will add are:
- chrono: Date and time library for Rust.
- rand: Random number generators and other randomness functionality.
Resulting in the addition of the following lines to the dependencies in Cargo.toml
:
chrono = { version = "0.4", features = ["serde"] } rand = "0.8"
Updating workers.rs
First, we should update the imports to include the things we will be using for generating the data (chrono
and rand
) and for serializing the data we use serde::Serialize
. Furthermore, we also need to import Message
from the warp
crate for when we want to convert data to a format for sending to a connected client:
use crate::Clients; use chrono::{DateTime, Utc}; use rand::prelude::*; use serde::Serialize; use tokio; use tokio::time::Duration; use warp::ws::Message;
Let’s add a struct to represent our test data:
#[derive(Serialize)] struct TestData { widget_type: String, widget_count: u32, creation_date: DateTime<Utc>, }
We add the #[derive(Serialize)]
attribute here to let the serde
library take care of serialization for us. The struct itself doesn’t matter. This struct is just for testing purposes and contains three arbitrary fields we will generate values for on the fly using the following function:
fn generate_random_data() -> Vec<TestData> { let mut rng = rand::thread_rng(); let mut test_data_batch = Vec::new(); for i in 0..10 { test_data_batch.push(TestData { widget_type: String::from(format!("widget{}", i)), widget_count: rng.gen_range(0..100), creation_date: Utc::now(), }); } return test_data_batch; }
Here we create an instance of a system seeded random number generator with: let mut rng = rand::thread_rng();
. Next, we create a new vector, and then loop 10 times to create 10 TestData
entries in the vector to send to the clients.
Now we are ready to call this function and send the resulting list (vector) to the clients by updating the main_worker
function. The updated lines are highlighted:
pub async fn main_worker(clients: Clients) { loop { tokio::time::sleep(Duration::from_millis(2000)).await; let connected_client_count = clients.lock().await.len(); if connected_client_count == 0 { println!("No clients connected, skip sending data"); continue; } println!("{} connected client(s)", connected_client_count); let test_data_batch = generate_random_data(); clients.lock().await.iter().for_each(|(_, client)| { if let Some(sender) = &client.sender { let _ = sender.send(Ok(Message::text( serde_json::to_string(&test_data_batch).unwrap(), ))); } }); } }
On line 27 we call the generate_random_data()
function we just wrote. Then, we gain a lock on the clients
list and then iterate through each item. In the for each we get a tuple like (index, item)
, we’re not using the index so we put a underscore (_
) there in the tuple, the underscore represents an ignored pattern binding. Then, we get a reference to the sender
from the client
, and use that to send our serialized data as text
.
Running the Warp data update loop to send data
When we run the server and connect now, we should see JSON data coming into our connected WebSocket client. For example, in the JSON tab of the WebSocket Client software we can see what the server is sending:
Conclusion: Warp data update loop completed
We have learned how to run an extra thread next to the main thread running the server. Next to that, we added a data update loop to our Warp server from the previous article, which gives us the ability to continuously update connected clients with new data. We also briefly went over how to create a serializable data structure to represent the data we are sending.
The full completed project can be found on my GitHub, here.
In the next article, we will gather data on cryptocurrency in a loop and then send that data to clients. If you’re interested the article can be found at this link: Crypto triangle arbitrage dashboard: how to, part 1.