In this article we will build a WebSocket server with Rust, using the warp framework. We will slowly go through the code building a simple version at first, and then add a bit more functionality to it. At the end of this article, we will be able to receive messages from the client and send something back in response.
We will use this project as a base for future articles to build upon. We add a continuous data update loop to this project in the follow-up article Warp data update loop: easy how to.
The repository with the complete Rust Warp WebSocket server project can be found on my GitHub, here.
If you are interested in how to develop a REST API with CRUD operations I have an article for that too: How to implement a Rust REST API with warp.
Prerequisites
It helps to have experience with writing Rust code.
Rust Websocket server project set up
First, create a new project with cargo
:
cargo new warp-websocket-server-tutorial
We are now going to add the required dependencies to the Cargo.toml
file.
[dependencies] tokio = { version= "1", features = ["full"] } tokio-stream = "0.1.6" warp = "0.3" serde = { version = "1.0", features = ["derive"]} serde_json = "1.0" futures = { version = "0.3", default-features=false} uuid = { version = "1.1.2", features = ["v4", "fast-rng", "macro-diagnostics"]}
Let’s go through these dependencies one by one quickly:
- tokio: A runtime for writing reliable, asynchronous, and slim applications with the Rust programming language.
- tokio-stream: Utilities to work with `Stream` and `tokio`.
- warp: A super-easy, composable, web server framework for warp speeds.
- serde: Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.
- serde_json: A JSON serialization file format.
- futures:
An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
- uuid: A library to generate and parse UUIDs.
Basic Rust Warp websocket server set up
We will first set up a basic WebSocket implementation for our WebSocket server with Rust, project, the server will simply have one endpoint for the WebSocket connect. However, it will not do anything if a client connects, yet. Later in this tutorial, we will add more functionality. For instance, we will keep the session alive and allow for receiving a message from the client. We will be working on 3 files: main.rs
, handlers.rs
, and ws.rs
. Let’s take a look at main.rs
.
Main.rs
The entry point to any Rust application:
use warp::{Filter, Rejection}; mod handlers; mod ws; type Result<T> = std::result::Result<T, Rejection>; #[tokio::main] async fn main() { println!("Configuring websocket route"); let ws_route = warp::path("ws") .and(warp::ws()) .and_then(handlers::ws_handler); let routes = ws_route.with(warp::cors().allow_any_origin()); println!("Starting server"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; }
First, we create a type alias to make future code quicker to write and easier to read. Then we set up the asynchronous main function with the async
keyword, and the #[tokio::main]
macro to main
on the Tokio runtime. This macro provides only basic configuration options.
Next, we create our WebSocket route for the server. We define the path as "ws"
, which means the full path will become: 127.0.0.1:8000/ws
for the client to connect to. The next line adds a WebSocket filter that yields a Ws
object that will be used to upgrade the connection to a WebSocket connection. Finally, we configure the handler function that is called to handle this route. We have not written that handler function yet, but we will do that right after this section.
We finish up the routes configuration by adding a CORS filter that allows any origin. The last line runs the warp server on IP 127.0.0.1
and port number 8000
.
Handlers.rs
Open the handlers.rs
file and add the following code:
use crate::{ws, Result}; use warp::Reply; pub async fn ws_handler(ws: warp::ws::Ws) -> Result<impl Reply> { println!("ws_handler"); Ok(ws.on_upgrade(move |socket| ws::client_connection(socket))) }
This listing shows a very simple public asynchronous function, that basically prints some text, and then calls another function and returns its result, a result that implements Reply
. A Reply
is a type that can be converted into a HTTP response, for more information go here. The last line calls ws.on_upgrade
, which finishes the protocol upgrade and configures the function (client_connection
, which we will write after this section) to be used to handle certain communication aspects.
The function doesn’t do much but don`t worry, in a later section, we will add more functionality to this function.
Ws.rs
This file will hold functions for handling the established connection and receiving messages from the client. For now, this will also be barebones:
use warp::ws::WebSocket; pub async fn client_connection(ws: WebSocket) { println!("establishing client connection... {:?}", ws); }
Here we print some text and the Debug
format of the WebSocket
. This is enough for now. At first, we just want to establish a working server we can connect to before getting into the details.
Running the barebones Rust Warp websocket server
We can now run the server and connect to it. Do this by running the cargo run
command. Then, you can use your favorite WebSocket client to connect. For example, I like to use WebSocket client for mac. When the server is running simply connect to it like so:
After hitting connect we should see a message get printed in the terminal:
Configuring websocket route Starting server ws_handler establishing client connection... WebSocket
You will notice that the client is immediately disconnected. This is because we are not doing anything with the incoming connection, yet. Now that we have established that we can connect to our server, we are well on our way to writing a useful WebSocket server in Rust. Let’s make it actually do something.
Adding functionality to the Rust Warp WebSocket server
In this section what we want to build is for clients to be able to send to our web server and to receive messages from our web server. In order to be able to do that we have to keep track of the connected clients and keep the connection with the client open.
Updating main.rs
We begin by updating main.rs
with code for storing connected clients, added lines are highlighted:
use std::{collections::HashMap, convert::Infallible, sync::Arc}; use tokio::sync::{mpsc, Mutex}; use warp::{ws::Message, Filter, Rejection}; mod handlers; mod ws; #[derive(Debug, Clone)] pub struct Client { pub client_id: String, pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>, } type Clients = Arc<Mutex<HashMap<String, Client>>>; type Result<T> = std::result::Result<T, Rejection>; #[tokio::main] async fn main() { let clients: Clients = Arc::new(Mutex::new(HashMap::new())); println!("Configuring websocket route"); let ws_route = warp::path("ws") .and(warp::ws()) .and(with_clients(clients.clone())) .and_then(handlers::ws_handler); let routes = ws_route.with(warp::cors().allow_any_origin()); println!("Starting server"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; } fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone { warp::any().map(move || clients.clone()) }
We have added a struct, Client
, definition for representing the connected client. The client has a client_id
field, which will just be a randomly generated uuid, and a sender
field which is a mpsc::UnboundedSender
type. This will allow us to send messages to the UnboundedReceiver
, which is the client. For more information see the documentation.
On line 14 we define a new alias type with definition type Clients = Arc<Mutex<HashMap<String, Client>>>;
. We use this for keeping track of connected clients. This is a HashMap
wrapped in a Mutex
from the tokio
library, which is then wrapped in an Arc
from the std
(standard) library. The Mutex
allows for locking the HashMap
resource across await points to prevent deadlocks or race conditions if multiple threads try to access the HashMap
. The Arc
is a thread-safe reference-counting pointer, this allows us to share the data within with multiple threads, which is very useful for an asynchronous web server.
On line 19 we create an new instance of the Clients
type and pass it to the ws_handler
function using the filter with_clients
on line 24. This filter function is defined on line 32-34.
The filter function with_clients
extracts the Clients
data. We return a Filter
matching any route and composes the filter with a function receiving the extracted data, in this case the clients
.
Updating handlers.rs
We will update the handler function ws_handler
to receive the HashMap
of clients, and pass this information along to the client_connection
function in the ws
module. We have no logic in this function, yet, but in a future article that continues with this project as a base, we will implement some authentication logic here.
use crate::{ws, Clients, Result}; use warp::Reply; pub async fn ws_handler(ws: warp::ws::Ws, clients: Clients) -> Result<impl Reply> { println!("ws_handler"); Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, clients))) }
Updating ws.rs
In the client_connection
function we will add code that creates an instance of the Client
struct and adds it to the clients HashMap
, to keep track of it. At the same time, we will add logic for receiving a “ping” message from the client and sending back a “pong” message. Let’s look at the full listing below:
use crate::{Client, Clients}; use futures::{FutureExt, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; use warp::ws::{Message, WebSocket}; pub async fn client_connection(ws: WebSocket, clients: Clients) { println!("establishing client connection... {:?}", ws); let (client_ws_sender, mut client_ws_rcv) = ws.split(); let (client_sender, client_rcv) = mpsc::unbounded_channel(); let client_rcv = UnboundedReceiverStream::new(client_rcv); tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| { if let Err(e) = result { println!("error sending websocket msg: {}", e); } })); let uuid = Uuid::new_v4().simple().to_string(); let new_client = Client { client_id: uuid.clone(), sender: Some(client_sender), }; clients.lock().await.insert(uuid.clone(), new_client); while let Some(result) = client_ws_rcv.next().await { let msg = match result { Ok(msg) => msg, Err(e) => { println!("error receiving message for id {}): {}", uuid.clone(), e); break; } }; client_msg(&uuid, msg, &clients).await; } clients.lock().await.remove(&uuid); println!("{} disconnected", uuid); } async fn client_msg(client_id: &str, msg: Message, clients: &Clients) { println!("received message from {}: {:?}", client_id, msg); let message = match msg.to_str() { Ok(v) => v, Err(_) => return, }; if message == "ping" || message == "ping\n" { let locked = clients.lock().await; match locked.get(client_id) { Some(v) => { if let Some(sender) = &v.sender { println!("sending pong"); let _ = sender.send(Ok(Message::text("pong"))); } } None => return, } return; }; }
Almost all of the lines of code are new, so let’s go through them section by section.
Imports and function signature updates
use crate::{Client, Clients}; use futures::{FutureExt, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; use warp::ws::{Message, WebSocket}; pub async fn client_connection(ws: WebSocket, clients: Clients) {
Here bring the required types, structs, etc, into scope. Then we update the client_connection
function’s signature to accept Clients
as a parameter. Of note is line 2: use futures::{FutureExt, StreamExt};
which brings certain traits into scope, required by ws.split()
and client_rcv.forward()
listed in the next section.
Stream objects and configuration
let (client_ws_sender, mut client_ws_rcv) = ws.split(); let (client_sender, client_rcv) = mpsc::unbounded_channel(); let client_rcv = UnboundedReceiverStream::new(client_rcv); tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| { if let Err(e) = result { println!("error sending websocket msg: {}", e); } }));
Here we split the WebSocket stream object into separate Sink
and Stream
objects. This is useful because we want to split ownership of the sending and receiving tasks. The Stream
part (client_ws_rcv
) will be used to receive messages from the client. The Sink
part (client_ws_sender
) is used to establish a connection in the unbounded channel.
On line 12 we create an unbounded channel. We configure it to be able to send messages to the client. Next, we instance a new UnboundedReceiverStream
using client_rcv
as input that implements Stream
so that forward()
can be used later.
On line 16-21 we spawn a new task that keeps the client_ws_sender
stream open until the client has disconnected.
Client registration
In this section, we are going to instance the client struct and add it to the clients HashMap
.
let uuid = Uuid::new_v4().simple().to_string(); let new_client = Client { client_id: uuid.clone(), sender: Some(client_sender), }; clients.lock().await.insert(uuid.clone(), new_client);
The client_sender
object is store with the new_client
object here, so that we can send messages to the connected client in other parts of the code.
With lines 29, we obtain a lock on the client list and insert the new_client
object into the clients
HashMap
using uuid
as the key.
Note that in a later tutorial, we will move this client registration code to an endpoint specifically used for registering a client before connecting through a WebSocket.
Handling messages from the client
In this final subsection, we will create a loop that handles incoming messages from the client. We will respond to a “ping” message with a message containing the text “pong”.
while let Some(result) = client_ws_rcv.next().await { let msg = match result { Ok(msg) => msg, Err(e) => { println!("error receiving message for id {}): {}", uuid.clone(), e); break; } }; client_msg(&uuid, msg, &clients).await; }
Here we have an infinite loop that receives the next item in the stream. This loop keeps running until the client is disconnected. When the client sends a message the loop will be entered and the message will be extracted and then further processed by client_msg
.
Below is the code listing for the client_msg
function, its purpose is handling the client’s messages.
async fn client_msg(client_id: &str, msg: Message, clients: &Clients) { println!("received message from {}: {:?}", client_id, msg); let message = match msg.to_str() { Ok(v) => v, Err(_) => return, }; if message == "ping" || message == "ping\n" { let locked = clients.lock().await; match locked.get(client_id) { Some(v) => { if let Some(sender) = &v.sender { println!("sending pong"); let _ = sender.send(Ok(Message::text("pong"))); } } None => return, } return; }; }
We pass the client id to this function so that we can retrieve the latest information about the client from the clients
map if we need it.
First, on line 49 we attempt to convert the message to a str
. Then we check if the message says “ping”, if so, we go into the process of sending “pong” back to the client. We do this by first getting the client by client_id
from the clients
map, if we can find the client we use the sender
object from the client to send a Message::text
containing “pong”. In any other case, we simply return from the function.
Handling disconnects
In this final subsection, we will handle the situation where the program loses connection with the client. This happens in the last few lines of the client_connection
function:
clients.lock().await.remove(&uuid); println!("{} disconnected", uuid);
When the client can no longer be reached due to having disconnected or for other reasons, the infinite loop waiting for items will stop, and code execution will move to line 42, where we remove the client by id from the HashMap
tracking the clients.
Completed ws.rs
The complete code listing looks like this:
use crate::{Client, Clients}; use futures::{FutureExt, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; use warp::ws::{Message, WebSocket}; pub async fn client_connection(ws: WebSocket, clients: Clients) { println!("establishing client connection... {:?}", ws); let (client_ws_sender, mut client_ws_rcv) = ws.split(); let (client_sender, client_rcv) = mpsc::unbounded_channel(); let client_rcv = UnboundedReceiverStream::new(client_rcv); tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| { if let Err(e) = result { println!("error sending websocket msg: {}", e); } })); let uuid = Uuid::new_v4().simple().to_string(); let new_client = Client { client_id: uuid.clone(), sender: Some(client_sender), }; clients.lock().await.insert(uuid.clone(), new_client); while let Some(result) = client_ws_rcv.next().await { let msg = match result { Ok(msg) => msg, Err(e) => { println!("error receiving message for id {}): {}", uuid.clone(), e); break; } }; client_msg(&uuid, msg, &clients).await; } clients.lock().await.remove(&uuid); println!("{} disconnected", uuid); } async fn client_msg(client_id: &str, msg: Message, clients: &Clients) { println!("received message from {}: {:?}", client_id, msg); let message = match msg.to_str() { Ok(v) => v, Err(_) => return, }; if message == "ping" || message == "ping\n" { let locked = clients.lock().await; match locked.get(client_id) { Some(v) => { if let Some(sender) = &v.sender { println!("sending pong"); let _ = sender.send(Ok(Message::text("pong"))); } } None => return, } return; }; }
Sending a ping message
We can now run our server again and connect to it, this time the client will stay connected. We can also send a “ping” message and get a response:
The terminal output should look like this:
Configuring websocket route Starting server ws_handler establishing client connection... WebSocket received message from 035f1ba1e89c4c54b0323cd09c6c2092: Text("ping") sending pong
Conclusion
We have completed writing our basic WebSocket server in Rust using the Warp framework. Now we know how to construct a WebSocket route in Warp and how to pass objects down the filter chain. How to keep track of clients’ data in a thread-safe way. And, we can receive messages from the clients and respond to them. In conclusion, we now have a good basis and understand for building more complex Websocket servers with Rust.
The completed code can be found on my GitHub, here.
Please check out my other article about building a REST API with warp: How to implement a Rust REST API with warp.
Please follow me on Twitter for updates on upcoming Rust programming articles:
Follow @tmdev82
Thanks a lot for your tutorials. On line 22 in ws.rs it is .simple() and not .to_simple(). Probably some changes in the uuid crate