Rust Warp WebSocket server: learn how to now

In this article we will build a WebSocket server with Rust, using the warp framework. We will slowly go through the code building a simple version at first, and then add a bit more functionality to it. At the end of this article, we will be able to receive messages from the client and send something back in response.

We will use this project as a base for future articles to build upon. We add a continuous data update loop to this project in the follow-up article Warp data update loop: easy how to.

The repository with the complete Rust Warp WebSocket server project can be found on my GitHub, here.

If you are interested in how to develop a REST API with CRUD operations I have an article for that too: How to implement a Rust REST API with warp.

Prerequisites

It helps to have experience with writing Rust code.

Rust Websocket server project set up

First, create a new project with cargo:

cargo new warp-websocket-server-tutorial

We are now going to add the required dependencies to the Cargo.toml file.

[dependencies]
tokio = { version= "1", features = ["full"] }
tokio-stream = "0.1.6"
warp = "0.3"
serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"
futures = { version = "0.3", default-features=false}

Let’s go through these dependencies one by one quickly:

  • tokio: A runtime for writing reliable, asynchronous, and slim applications with the Rust programming language.
  • tokio-stream: Utilities to work with `Stream` and `tokio`.
  • warp: A super-easy, composable, web server framework for warp speeds.
  • serde: Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.
  • serde_json: A JSON serialization file format.
  • futures: An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.

Basic Rust Warp websocket server set up

We will first set up a basic WebSocket implementation for our WebSocket server with Rust, project, the server will simply have one endpoint for the WebSocket connect. However, it will not do anything if a client connects, yet. Later in this tutorial, we will add more functionality. For instance, we will keep the session alive and allow for receiving a message from the client. We will be working on 3 files: main.rs, handlers.rs, and ws.rs. Let’s take a look at main.rs.

Main.rs

The entry point to any Rust application:

use warp::{Filter, Rejection};
mod handlers;
mod ws;

type Result<T> = std::result::Result<T, Rejection>;

#[tokio::main]
async fn main() {
    println!("Configuring websocket route");
    let ws_route = warp::path("ws")
        .and(warp::ws())
        .and_then(handlers::ws_handler);

    let routes = ws_route.with(warp::cors().allow_any_origin());
    println!("Starting server");
    warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

First, we create a type alias to make future code quicker to write and easier to read. Then we set up the asynchronous main function with the async keyword, and the #[tokio::main] macro to main on the Tokio runtime. This macro provides only basic configuration options.

Next, we create our WebSocket route for the server. We define the path as "ws", which means the full path will become: 127.0.0.1:8000/ws for the client to connect to. The next line adds a WebSocket filter that yields a Ws object that will be used to upgrade the connection to a WebSocket connection. Finally, we configure the handler function that is called to handle this route. We have not written that handler function yet, but we will do that right after this section.

We finish up the routes configuration by adding a CORS filter that allows any origin. The last line runs the warp server on IP 127.0.0.1and port number 8000.

Handlers.rs

Open the handlers.rs file and add the following code:

use crate::{ws, Result};
use warp::Reply;

pub async fn ws_handler(ws: warp::ws::Ws) -> Result<impl Reply> {
    println!("ws_handler");

    Ok(ws.on_upgrade(move |socket| ws::client_connection(socket)))
}

This listing shows a very simple public asynchronous function, that basically prints some text, and then calls another function and returns its result, a result that implements Reply. A Reply is a type that can be converted into a HTTP response, for more information go here. The last line calls ws.on_upgrade, which finishes the protocol upgrade and configures the function (client_connection, which we will write after this section) to be used to handle certain communication aspects.

The function doesn’t do much but don`t worry, in a later section, we will add more functionality to this function.

Ws.rs

This file will hold functions for handling the established connection and receiving messages from the client. For now, this will also be barebones:

use warp::ws::WebSocket;

pub async fn client_connection(ws: WebSocket) {
    println!("establishing client connection... {:?}", ws);
}

Here we print some text and the Debug format of the WebSocket. This is enough for now. At first, we just want to establish a working server we can connect to before getting into the details.

Running the barebones Rust Warp websocket server

We can now run the server and connect to it. Do this by running the cargo run command. Then, you can use your favorite WebSocket client to connect. For example, I like to use WebSocket client for mac. When the server is running simply connect to it like so:

WebSocket client connection example 1

After hitting connect we should see a message get printed in the terminal:

Configuring websocket route
Starting server
ws_handler
establishing client connection... WebSocket

You will notice that the client is immediately disconnected. This is because we are not doing anything with the incoming connection, yet. Now that we have established that we can connect to our server, we are well on our way to writing a useful WebSocket server in Rust. Let’s make it actually do something.

Adding functionality to the Rust Warp WebSocket server

In this section what we want to build is for clients to be able to send to our web server and to receive messages from our web server. In order to be able to do that we have to keep track of the connected clients and keep the connection with the client open.

Updating main.rs

We begin by updating main.rs with code for storing connected clients, added lines are highlighted:

use std::{collections::HashMap, convert::Infallible, sync::Arc};
use tokio::sync::{mpsc, Mutex};
use warp::{ws::Message, Filter, Rejection};

mod handlers;
mod ws;

#[derive(Debug, Clone)]
pub struct Client {
    pub client_id: String,
    pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
}

type Clients = Arc<Mutex<HashMap<String, Client>>>;
type Result<T> = std::result::Result<T, Rejection>;

#[tokio::main]
async fn main() {
    let clients: Clients = Arc::new(Mutex::new(HashMap::new()));

    println!("Configuring websocket route");
    let ws_route = warp::path("ws")
        .and(warp::ws())
        .and(with_clients(clients.clone()))
        .and_then(handlers::ws_handler);

    let routes = ws_route.with(warp::cors().allow_any_origin());
    println!("Starting server");
    warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone {
    warp::any().map(move || clients.clone())
}

We have added a struct, Client, definition for representing the connected client. The client has a client_id field, which will just be a randomly generated uuid, and a sender field which is a mpsc::UnboundedSender type. This will allow us to send messages to the UnboundedReceiver, which is the client. For more information see the documentation.

On line 14 we define a new alias type with definition type Clients = Arc<Mutex<HashMap<String, Client>>>;. We use this for keeping track of connected clients. This is a HashMap wrapped in a Mutex from the tokio library, which is then wrapped in an Arc from the std (standard) library. The Mutex allows for locking the HashMap resource across await points to prevent deadlocks or race conditions if multiple threads try to access the HashMap. The Arc is a thread-safe reference-counting pointer, this allows us to share the data within with multiple threads, which is very useful for an asynchronous web server.

On line 19 we create an new instance of the Clients type and pass it to the ws_handler function using the filter with_clients on line 24. This filter function is defined on line 32-34.

The filter function with_clients extracts the Clients data. We return a Filter matching any route and composes the filter with a function receiving the extracted data, in this case the clients.

Updating handlers.rs

We will update the handler function ws_handler to receive the HashMap of clients, and pass this information along to the client_connection function in the ws module. We have no logic in this function, yet, but in a future article that continues with this project as a base, we will implement some authentication logic here.

use crate::{ws, Clients, Result};
use warp::Reply;

pub async fn ws_handler(ws: warp::ws::Ws, clients: Clients) -> Result<impl Reply> {
    println!("ws_handler");

    Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, clients)))
}

Updating ws.rs

In the client_connection function we will add code that creates an instance of the Client struct and adds it to the clients HashMap, to keep track of it. At the same time, we will add logic for receiving a “ping” message from the client and sending back a “pong” message. Let’s look at the full listing below:

use crate::{Client, Clients};
use futures::{FutureExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};

pub async fn client_connection(ws: WebSocket, clients: Clients) {
    println!("establishing client connection... {:?}", ws);

    let (client_ws_sender, mut client_ws_rcv) = ws.split();
    let (client_sender, client_rcv) = mpsc::unbounded_channel();

    let client_rcv = UnboundedReceiverStream::new(client_rcv);

    tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
        if let Err(e) = result {
            println!("error sending websocket msg: {}", e);
        }
    }));

    let uuid = Uuid::new_v4().to_simple().to_string();

    let new_client = Client {
        client_id: uuid.clone(),
        sender: Some(client_sender),
    };

    clients.lock().await.insert(uuid.clone(), new_client);

    while let Some(result) = client_ws_rcv.next().await {
        let msg = match result {
            Ok(msg) => msg,
            Err(e) => {
                println!("error receiving message for id {}): {}", uuid.clone(), e);
                break;
            }
        };
        client_msg(&uuid, msg, &clients).await;
    }

    clients.lock().await.remove(&uuid);
    println!("{} disconnected", uuid);
}

async fn client_msg(client_id: &str, msg: Message, clients: &Clients) {
    println!("received message from {}: {:?}", client_id, msg);

    let message = match msg.to_str() {
        Ok(v) => v,
        Err(_) => return,
    };

    if message == "ping" || message == "ping\n" {
        let locked = clients.lock().await;
        match locked.get(client_id) {
            Some(v) => {
                if let Some(sender) = &v.sender {
                    println!("sending pong");
                    let _ = sender.send(Ok(Message::text("pong")));
                }
            }
            None => return,
        }
        return;
    };
}

Almost all of the lines of code are new, so let’s go through them section by section.

Imports and function signature updates

use crate::{Client, Clients};
use futures::{FutureExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};

pub async fn client_connection(ws: WebSocket, clients: Clients) {

Here bring the required types, structs, etc, into scope. Then we update the client_connection function’s signature to accept Clients as a parameter. Of note is line 2: use futures::{FutureExt, StreamExt}; which brings certain traits into scope, required by ws.split() and client_rcv.forward() listed in the next section.

Stream objects and configuration

    let (client_ws_sender, mut client_ws_rcv) = ws.split();
    let (client_sender, client_rcv) = mpsc::unbounded_channel();

    let client_rcv = UnboundedReceiverStream::new(client_rcv);

    tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
        if let Err(e) = result {
            println!("error sending websocket msg: {}", e);
        }
    }));

Here we split the WebSocket stream object into separate Sink and Stream objects. This is useful because we want to split ownership of the sending and receiving tasks. The Stream part (client_ws_rcv) will be used to receive messages from the client. The Sink part (client_ws_sender) is used to establish a connection in the unbounded channel.

On line 12 we create an unbounded channel. We configure it to be able to send messages to the client. Next, we instance a new UnboundedReceiverStream using client_rcv as input that implements Stream so that forward() can be used later.

On line 16-21 we spawn a new task that keeps the client_ws_sender stream open until the client has disconnected.

Client registration

In this section, we are going to instance the client struct and add it to the clients HashMap.

   let uuid = Uuid::new_v4().to_simple().to_string();

    let new_client = Client {
        client_id: uuid.clone(),
        sender: Some(client_sender),
    };

    clients.lock().await.insert(uuid.clone(), new_client);

The client_sender object is store with the new_client object here, so that we can send messages to the connected client in other parts of the code.

With lines 29, we obtain a lock on the client list and insert the new_client object into the clients HashMap using uuid as the key.

Note that in a later tutorial, we will move this client registration code to an endpoint specifically used for registering a client before connecting through a WebSocket.

Handling messages from the client

In this final subsection, we will create a loop that handles incoming messages from the client. We will respond to a “ping” message with a message containing the text “pong”.

   while let Some(result) = client_ws_rcv.next().await {
        let msg = match result {
            Ok(msg) => msg,
            Err(e) => {
                println!("error receiving message for id {}): {}", uuid.clone(), e);
                break;
            }
        };
        client_msg(&uuid, msg, &clients).await;
    }

Here we have an infinite loop that receives the next item in the stream. This loop keeps running until the client is disconnected. When the client sends a message the loop will be entered and the message will be extracted and then further processed by client_msg.

Below is the code listing for the client_msg function, its purpose is handling the client’s messages.

async fn client_msg(client_id: &str, msg: Message, clients: &Clients) {
    println!("received message from {}: {:?}", client_id, msg);

    let message = match msg.to_str() {
        Ok(v) => v,
        Err(_) => return,
    };

    if message == "ping" || message == "ping\n" {
        let locked = clients.lock().await;
        match locked.get(client_id) {
            Some(v) => {
                if let Some(sender) = &v.sender {
                    println!("sending pong");
                    let _ = sender.send(Ok(Message::text("pong")));
                }
            }
            None => return,
        }
        return;
    };
}

We pass the client id to this function so that we can retrieve the latest information about the client from the clients map if we need it.

First, on line 49 we attempt to convert the message to a str. Then we check if the message says “ping”, if so, we go into the process of sending “pong” back to the client. We do this by first getting the client by client_id from the clients map, if we can find the client we use the sender object from the client to send a Message::text containing “pong”. In any other case, we simply return from the function.

Handling disconnects

In this final subsection, we will handle the situation where the program loses connection with the client. This happens in the last few lines of the client_connection function:

    clients.lock().await.remove(&uuid);
    println!("{} disconnected", uuid);

When the client can no longer be reached due to having disconnected or for other reasons, the infinite loop waiting for items will stop, and code execution will move to line 42, where we remove the client by id from the HashMap tracking the clients.

Completed ws.rs

The complete code listing looks like this:

use crate::{Client, Clients};
use futures::{FutureExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};

pub async fn client_connection(ws: WebSocket, clients: Clients) {
    println!("establishing client connection... {:?}", ws);

    let (client_ws_sender, mut client_ws_rcv) = ws.split();
    let (client_sender, client_rcv) = mpsc::unbounded_channel();

    let client_rcv = UnboundedReceiverStream::new(client_rcv);

    tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
        if let Err(e) = result {
            println!("error sending websocket msg: {}", e);
        }
    }));

    let uuid = Uuid::new_v4().to_simple().to_string();

    let new_client = Client {
        client_id: uuid.clone(),
        sender: Some(client_sender),
    };

    clients.lock().await.insert(uuid.clone(), new_client);

    while let Some(result) = client_ws_rcv.next().await {
        let msg = match result {
            Ok(msg) => msg,
            Err(e) => {
                println!("error receiving message for id {}): {}", uuid.clone(), e);
                break;
            }
        };
        client_msg(&uuid, msg, &clients).await;
    }

    clients.lock().await.remove(&uuid);
    println!("{} disconnected", uuid);
}

async fn client_msg(client_id: &str, msg: Message, clients: &Clients) {
    println!("received message from {}: {:?}", client_id, msg);

    let message = match msg.to_str() {
        Ok(v) => v,
        Err(_) => return,
    };

    if message == "ping" || message == "ping\n" {
        let locked = clients.lock().await;
        match locked.get(client_id) {
            Some(v) => {
                if let Some(sender) = &v.sender {
                    println!("sending pong");
                    let _ = sender.send(Ok(Message::text("pong")));
                }
            }
            None => return,
        }
        return;
    };
}

Sending a ping message

We can now run our server again and connect to it, this time the client will stay connected. We can also send a “ping” message and get a response:

Sending a WebSocket message

The terminal output should look like this:

Configuring websocket route
Starting server
ws_handler
establishing client connection... WebSocket
received message from 035f1ba1e89c4c54b0323cd09c6c2092: Text("ping")
sending pong

Conclusion

We have completed writing our basic WebSocket server in Rust using the Warp framework. Now we know how to construct a WebSocket route in Warp and how to pass objects down the filter chain. How to keep track of clients’ data in a thread-safe way. And, we can receive messages from the clients and respond to them. In conclusion, we now have a good basis and understand for building more complex Websocket servers with Rust.

The completed code can be found on my GitHub, here.

Please check out my other article about building a REST API with warp: How to implement a Rust REST API with warp.

Please follow me on Twitter for updates on upcoming Rust programming articles:

Leave a Reply

Your email address will not be published.