In this tutorial, we are going to look at how to get cryptocurrency symbol ticker data from the KuCoin WebSocket Futures API with Rust. We will be using the public WebSocket API and we will connect to it using Rust and the Tungstenite crate.
The Symbol ticker data stream sends real-time price data for cryptocurrency futures symbols. At the end of this tutorial we will have a program that can:
- connect to the KuCoin cryptocurrency futures WebSocket API.
- retrieve all available cryptocurrency futures symbols from the REST API.
- subscribe to streams for all the cryptocurrency futures symbols.
- process incoming WebSocket messages about cryptocurrency futures price data.
This project will be a good basis for cryptocurrency real-time price visualization or a trading bot for example.
The repository with the full project code can be found on my GitHub: here.
The KuCoin futures API documentation for the symbol ticker data can be found here.
I have an article specifically about using WebSockets with Rust: Rust Warp WebSocket server: learn how to now.
I also have a similar article for the Binance API: Easily connect to Binance WebSocket streams with Rust.
Contents
- 1 Prerequisites
- 2 KuCoin Futures API with Rust project architecture
- 3 KuCoin Futures API with Rust project setup
- 4 Generic REST API Client
- 5 REST API Data models
- 6 KuCoin Futures REST API Client
- 7 WebSocket connector implementation
- 8 Building the KuCoin Futures WebSocket Client
- 8.1 WebSocket related data models
- 8.2 Foundation for KuCoin Futures WebSocket Client
- 8.3 Run and eventloop outline
- 8.4 Establishing the connection
- 8.5 Receiving incoming WebSocket messages
- 8.6 Determine KuCoin Futures WebSocket message types
- 8.7 Handling incoming messages
- 8.8 Subscribing to WebSocket channels
- 8.9 Writing the subscribe_to_topics function
- 8.10 Handling symbol ticker messages
- 9 Running the completed “KuCoin Futures API with Rust” project
- 10 Conclusion
Prerequisites
Some Rust programming knowledge and knowledge working with WebSockets are required to follow this tutorial.
KuCoin Futures API with Rust project architecture
Before we get started with writing code let’s first briefly look at the structure and components of the project we are going to build.
Program flow
As mentioned in the introduction our program will be able to connect to the KuCoin cryptocurrency futures API and receive futures price data. We are going to subscribe to the symbol ticker streams specifically. These streams give live updates on prices for any symbol channel we subscribe to. In this tutorial, we will not do anything with the incoming data, except for printing it to the terminal.
According to the KuCoin API documentation, these are the steps we have to take to connect to the WebSocket API and start receiving cryptocurrency futures price data:
- Get list of servers and connection token from REST API endpoint:
https://api-futures.kucoin.com/api/v1/api/v1/bullet-public
. - Use server URL(s) and token to connect to the Futres WebSocket API.
- If successful receive a welcome message.
- Subscribe to desired channels. In this case:
/contractMarket/tickerV2:{symbol}
- We can retrieve the symbols we can subscribe to from the Futures REST API:
https://api-futures.kucoin.com/api/v1/contracts/active
.
This flow looks like this:
Project components
Next, let’s get a general idea of the components/modules we will write for our “KuCoin Futures API connect with Rust” project. In broad terms we need the following components:
- HTTP Client: we have to retrieve information from the REST API.
- KuCoin Futures REST API Client: this builds on the HTTP client and has knowledge about the KuCoin Fututres REST API endpoints.
- KuCoin Futures WebSocket Client: can connect to the KuCoin Futures WebSocket server, receive and process incoming messages and data.
- Data structure models: for deserializing incoming data to make it easier to work with.
KuCoin Futures API with Rust project setup
We start as always by creating a project using cargo
: cargo new rust-kucoin-futures-symbol-ticker
.
Dependencies
Let’s look at the dependencies we are going to add to the project’s Cargo.toml
:
- anyhow: Flexible concrete Error type built on std::error::Error.
- log: A lightweight logging facade for Rust.
- log4rs: A highly configurable multi-output logging implementation for the `log` facade.
- reqwest: higher level HTTP client library.
- serde: A generic serialization/deserialization framework.
- serde_json: A JSON serialization file format.
- tokio: An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
- tungstenite: Lightweight stream-based WebSocket implementation.
- url: URL library for Rust, based on the WHATWG URL Standard.
Some of these are not strictly required to complete the project. For example, log and log4rs, and anyhow. However, these crates make development a lot more comfortable.
Logging
Speaking of log4rs, let’s get the configuration of the logging out of the way while we are at it. Let’s add a file called logconfig.yaml
to the root directory of our project:
appenders: stdout_logger: kind: console encoder: pattern: "{h({d(%Y-%m-%d %H:%M:%S)(utc)} - {l}: {m}{n})}" root: level: debug appenders: - stdout_logger
This will configure log output to stdout showing a timestamp, the log level, and the log message. For more information on how log4rs can be used please check out my tutorial on it: Basic how to log to a file in Rust with log4rs.
Then in our main.rs
file we have to load the configuration file:
use log::{info}; use log4rs; fn main() { log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found."); info!("We now have nice logging!"); }
The log
crate allows us to use macros like the one we see here with info
. There is also error
, debug
, and trace
.
Generic REST API Client
In this section, we are going to build the REST API Client that we will use to get information to be able to connect to the WebSocket server. Other than that this client will also retrieve information about futures symbols so that we can subscribe to the real-time price data channels.
For now, this client needs to be able to send HTTP GET and POST requests. Because we are aiming to build a foundation for future work let’s make it generic.
Basic Client
Let’s create a file called rest_client.rs
:
pub struct Client { host: String, inner_client: reqwest::Client, } impl Client { pub fn new(host: String) -> Self { Client { host, inner_client: reqwest::Client::builder() .pool_idle_timeout(None) .build() .unwrap(), } } }
Here we have the basic setup for our generic REST API client. A struct with fields for the host URL (host
) and for an instance of the reqwest Client object. Because we want to make use of connection pooling we store an instance of the Client object to reuse for each request.
On lines 6 through 16 we have impl Clien
block that implements a new
function for our Client
struct.
Next, we will add several functions for sending requests and handling the responses.
Bringing items into scope
Let’s make our lives easier by bringing a bunch of items into scope that we will be using:
use anyhow::{bail, Result}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, USER_AGENT}; use reqwest::{Response, StatusCode}; use serde::de::DeserializeOwned; use serde::Deserialize; use std::fmt;
First, anyhow
for error types. Then the reqwest
related items we will talk more about later. Finally, we add serde::de:DeserializeOwned
which we will use in return values to signify we are returning ownable data with our functions.
Response handler function
When we send requests to the KuCoin REST API we expect to get a response in the form of some data. However, there are a number of generic responses we can receive that are failures. For example, the BAD REQUEST response or SERVICE UNAVAILABLE and the like.
To deal with a number of these responses Let’s write a handler function. Our response handler function will be called by functions sending requests to handle the message that comes back from the REST API server.
Let’s add the handler function to the impl Client
code block now:
async fn handler<T: DeserializeOwned>(&self, response: Response) -> Result<T> { match response.status() { StatusCode::OK => Ok(response.json::<T>().await?), StatusCode::INTERNAL_SERVER_ERROR => { bail!("Internal Server Error"); } StatusCode::SERVICE_UNAVAILABLE => { bail!("Service Unavailable"); } StatusCode::UNAUTHORIZED => { bail!("Unauthorized"); } StatusCode::BAD_REQUEST => { let error: ContentError = response.json().await?; bail!(error) } s => { bail!(format!("Received response: {:?}", s)); } } }
Here we take in a Response
object and pattern match it with match
, mainly checking what StatusCode
we received.
If we receive an Ok
then we deserialize the response JSON and return it.
We check some specific error codes and return error messages using bail!
which is a macro that helps with returning errors.
For bad request
responses, on lines 48-51, we deserialize the response JSON into a ContentError
data structure we have not written yet. So let’s do that now. Let’s add it to rest_client.rs
, somewhere below the impl Client
code block:
#[derive(Debug, Deserialize)] pub struct ContentError { pub code: i16, pub msg: String, } impl fmt::Display for ContentError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "code: {} \nmsg: {}", self.code, self.msg) // user-facing output } }
We implement a Display formatter for good measure. This simply displays the error code and a message string.
If we go back to the match
expression, at the end we have a generic catch-all arm:
s => { bail!(format!("Received response: {:?}", s)); }
This will catch any other status code and simply put the code in an error message.
GET function
Now let’s add the function for making GET requests. A get request sends no data, it just requests it. However, it can send request or query parameters. For example, these parameters can be used to filter the requested data.
Add this code to the impl Client
code block:
pub async fn get<T: DeserializeOwned>( &self, endpoint: &str, request: Option<String>, ) -> Result<T> { let mut url: String = format!("{}{}", self.host, endpoint); if let Some(request) = request { if !request.is_empty() { url.push_str(format!("?{}", request).as_str()); } } let client = &self.inner_client; let response = client.get(url.as_str()).send().await?; self.handler(response).await }
This is a generic function that takes an endpoint string slice (&str
) and optionally a string containing request options. We return an object of type T
that implements DeserializeOwned
. In short, this means we will be returning deserializable data structs objects that are owned, not borrowed.
On line 51-56 we construct the target URL from the given endpoint
and our internal host
address. The request params are added if there are any.
Then we get a reference to the client instance and call the get
functions followed by send
to send the request. Since this is an asynchronous request we have to add an await
here.
When we get a response we pass it off to our handler
function and return the result from it.
POST function
For the POST requests, we should construct a header containing our user agent name.
Let’s add a function called build_headers
to the impl Client
code block under the get
function:
fn build_headers(&self) -> Result<HeaderMap> { let mut custom_headers = HeaderMap::new(); custom_headers.insert(USER_AGENT, HeaderValue::from_static("crypto-connector")); Ok(custom_headers) }
The user agent can be any name. This function is not much right now. However, in the future, we can expand it to add an api-key header for example. Unfortunately adding an api-key header for signed requests is outside the scope of this tutorial.
Let’s move on to write the function for POST requests. Add this code to the impl Client
code block:
pub async fn post<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> { let url: String = format!("{}{}", self.host, endpoint); let client = &self.inner_client; let response = client .post(url.as_str()) .headers(self.build_headers(false)?) .send() .await?; self.handler(response).await }
This function is almost identical to our get
function. Except for the fact that there is no parameter for request
parameters. We are calling post
instead of get
, of course, and we have added a call to headers()
.
Note: we are not actually sending data with this request. While typical we would send data with a POST request. However, there are cases where it is not necessary to send data. In fact, the KuCoin Futures REST API has such a case.
Trying out our REST Client
In this section, we will quickly try out our new REST Client to see if we can get data from the KuCoin REST API with our Rust code.
Let’s open the main.rs
file and add the following code:
use log::{info}; use log4rs; mod rest_client; #[tokio::main] async fn main() { log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found."); info!("We now have nice logging!"); let client = rest_client::Client::new("https://api-futures.kucoin.com/api/v1".to_string()); let result: serde_json::Value = client.post("/bullet-public").await.unwrap(); info!("KuCoin bullet-public result: {:?}", result); }
Here we create an instance of our REST Client on line 11 and pass the KuCoin futures REST API host address as a parameter. Then on the next line, we call our post
function with the endpoint for retrieving the WebSocket endpoint and connection-token. See KuCoin API documentation for more information.
We deserialize the data to a generic serde_json::Value
for now and print it on line 14.
If we run the program the result should look like this:
Finished dev [unoptimized + debuginfo] target(s) in 25.40s Running `target/debug/rust-kucoin-futures-websocket` 2022-03-09 21:23:35 - INFO: We now have nice logging! 2022-03-09 21:23:35 - DEBUG: starting new connection: https://api-futures.kucoin.com/ 2022-03-09 21:23:35 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public 2022-03-09 21:23:35 - INFO: KuCoin bullet-public result: Object({"code": String("200000"), "data": Object({"instanceServers": Array([Object({"encrypt": Bool(true), "endpoint": String("wss://ws-api.kucoin.com/endpoint"), "pingInterval": Number(18000), "pingTimeout": Number(10000), "protocol": String("websocket")})]), "token": String("2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJxvqRwQb3of_3eE5sq9oXsYDL0EWbh-iz9iYB9J6i9GjsxUuhPw3BlrzazF6ghq4L4nXitq7pjaun3DzuujPaag=.aRaeWiVwqo8mLyml8lNM9w==")})})
We also have to start writing some data structures to process the data we will receive.
REST API Data models
Before we start writing the Futures REST API Client we are going to write the structs that help us deserialize the data. We can see what the data looks like in the KuCoin Futures API documentation and base our data models on that.
So let’s add a file called models.rs
for out data models.
Public Channels data
First the data structs for getting the server data and connection-token. In the documentation, this is called “the public channels”. The JSON data looks like this:
{ "code": "200000", "data": { "instanceServers": [ { "pingInterval": 50000, "endpoint": "wss://push.kucoin.com/endpoint", "protocol": "websocket", "encrypt": true, "pingTimeout": 10000 } ], "token": "vYNlCtbz4XNJ1QncwWilJnBtmmfe4geLQDUA62kKJsDChc6I4bRDQc73JfIrlFaVYIAE0Gv2--MROnLAgjVsWkcDq_MuG7qV7EktfCEIphiqnlfpQn4Ybg==.IoORVxR2LmKV7_maOR9xOg==" } }
This data can be split up into 3 separate structs:
use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Channels { pub code: String, pub data: ChannelsData, } #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct ChannelsData { pub instance_servers: Vec<InstanceServer>, pub token: String, } #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct InstanceServer { pub ping_interval: i64, pub endpoint: String, pub protocol: String, pub encrypt: bool, pub ping_timeout: i64, }
Most of it is pretty self-explanatory. Because the KuCoin API writes names in camel case and Rust prefers snake case, we annotate the structs with #[serde(rename_all = "camelCase")]
. Doing this will rename the fields automatically.
Open contracts data
Next, we also want to retrieve all available symbols for cryptocurrency futures contracts. That is because we want to subscribe to WebSocket channels for all symbol ticker data (real-time price data). Therefore, let’s also add data structs for the open contracts list. The data is quite long but we can have Rust only deserialize the fields we are interested in.
Add the models for open contracts:
#[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct OpenContractData { pub symbol: String, pub root_symbol: String, #[serde(rename = "type")] pub contract_type: String, pub base_currency: String, pub quote_currency: String, pub settle_currency: String, pub max_order_qty: f64, pub max_price: f64, pub lot_size: f64, pub tick_size: f64, } #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct OpenContracts { pub code: Option<String>, pub data: Vec<OpenContractData>, }
For the scope of this project we only really need the symbol
field. However, if we wanted to execute trades we would need to know things like valid price step increments (tick_size
), order size step increments (lot_size
), what currency we can buy this with (quote_currency
), etc.
KuCoin Futures REST API Client
In this section, we will build a client that has knowledge of the KuCoin business logic/endpoints. This will wrap the generic client and make getting information from the KuCoin Futures REST API simpler in other parts of the code.
Let’s add a new file called futures_rest_client.rs
to our project. And let’s start with a basic struct and new()
function implementation:
use crate::rest_client::Client; use anyhow::{bail, Result}; use log::{debug, error}; pub struct FuturesRESTClient { client: Client, } impl FuturesRESTClient { pub fn new(hostname: &str) -> FuturesRESTClient { FuturesRESTClient { client: Client::new(hostname.to_string()), } } }
This struct simply has the generic REST API Client as a field. What it adds on top of this is business logic in the functions. Let’s write those now.
Get public channels function
First, we bring the models we want to use into scope:
use crate::{rest_client::Client, models::{Channels, OpenContracts}}; use anyhow::{bail, Result}; use log::{debug, error}; pub struct FuturesRESTClient { client: Client, }
Then let’s add the function get_public_channels
to the impl FuturesRESTClient
code block. It will simply call the post()
function on the client
with the appropriate endpoint
parameter:
pub async fn get_public_channels(&self) -> Result<Channels> { let result = self.client.post("/bullet-public").await; match result { Ok(channels) => Ok(channels), Err(e) => bail!(format!("Error retrieving channels: {:?}", e)), } }
As mentioned the /bullet-public
endpoint returns data containing a server URL and a connection-token. This is represented by our Channels
struct. Our request should be sent as a POST request.
Get open contracts and available symbols
Next, we need functions for retrieving the Futures contracts symbols data as these are used to subscribe to WebSocket streams.
Let’s add the following code to the impl FuturesRESTClient
code block:
pub async fn get_open_contracts(&self) -> Result<OpenContracts> { let result = self.client.get("/contracts/active", None).await; match result { Ok(open_contracts) => Ok(open_contracts), Err(e) => bail!(format!("Error retrieving contracts: {:?}", e)), } } pub async fn get_available_symbols(&self) -> Result<Vec<String>> { let contracts = match self.get_open_contracts().await { Ok(contracts) => contracts, Err(e) => { bail!(e); } }; let symbols_list: Vec<String> = contracts.data.iter().map(|f| f.symbol.clone()).collect(); debug!("Found {} available symbols.", symbols_list.len()); Ok(symbols_list) }
Our function get_open_contracts
on lines 28-35 for getting the OpenContracts
data is similar to the pattern of the get_public_channels
function. Only this time we are using get
to make a request.
Finally, we add a helper function get_available_symbols
for extracting a list of symbol names as a Vec<String>
from the OpenContracts
data. This will make the code that needs this information more readable.
Updating the example in main
Now we can update the example in main.rs
to be much cleaner:
use log::{info}; use log4rs; mod rest_client; mod futures_rest_client; mod models; #[tokio::main] async fn main() { log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found."); info!("We now have nice logging!"); let client = futures_rest_client::FuturesRESTClient::new("https://api-futures.kucoin.com/api/v1"); let result = client.get_public_channels().await.unwrap(); info!("KuCoin bullet-public result: {:?}", result); }
Now we are getting somewhere with our “KuCoin Futures API connector with Rust project”. If we run the program the output looks like this:
2022-03-10 21:49:57 - INFO: We now have nice logging! 2022-03-10 21:49:57 - DEBUG: starting new connection: https://api-futures.kucoin.com/ 2022-03-10 21:49:57 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public 2022-03-10 21:49:57 - INFO: KuCoin bullet-public result: Channels { code: "200000", data: ChannelsData { instance_servers: [InstanceServer { ping_interval: 18000, endpoint: "wss://ws-api.kucoin.com/endpoint", protocol: "websocket", encrypt: true, ping_timeout: 10000 }], token: "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJwChJVhr1aQcVr0g_mKH9Hyj7Z-j4YamgtiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3tcDDWbtK5Qif44T6DY8K8=.gpZKf7rAfzZtE4y3scBd8Q==" } }
Next, we are going to build the WebSocket connection logic.
WebSocket connector implementation
In this section, we are going to write some logic for connecting to a WebSocket server given a list of URLs. Typically we will only have one URL that can be used for a connection, but there are cases where there can be multiple different URLs.
Let’s add a file called websocket.rs
to put in our generic WebSocket connection logic:
use anyhow::{bail, Result}; use log::{error, info}; use std::net::TcpStream; use tungstenite::handshake::client::Response; use tungstenite::protocol::WebSocket; use tungstenite::stream::MaybeTlsStream; use url::Url; pub fn connect_wss( exchange: &str, websocket_urls: Vec<String>, ) -> Result<(WebSocket<MaybeTlsStream<TcpStream>>, Response)> { let max_retry = 5; for i in 0..max_retry { for wss in &websocket_urls { info!("[{}] connecting to {} (try {})", exchange, wss, i); let url = Url::parse(wss)?; match tungstenite::connect(url) { Ok(answer) => { return Ok(answer); } Err(e) => error!("Error during handshake {}", e), } } } bail!(format!("Max connection retry reached")); }
We have just one function here that uses tungstenite
to connect to an URL and return a WebSocket<MaybeTlsStream<TcpStream>>
. This object represents a local to remote stream object over TCP that might be protected with TLS. In our case, it will be a secure TLS connection.
Then we have two loops. One retry loop that runs a maximum number of times to retry connecting if a connection fails at first. And then the inner loop that cycles through all the available WebSocket URLs.
We attempt to connect to the URL on line 19 with tungstenite::connect
and return out of the function if successful.
Building the KuCoin Futures WebSocket Client
In this section, we will write logic code that brings all the pieces we have built so far together.
As mentioned in an earlier section, this logic will:
- Retrieve the WebSocket server URL(s) and connection token
- Open a WebSocket connection
- Retrieve available cryptocurrency futures contract symbols
- Subscribe to all the related streams for the symbol ticker price data
- Run an infinite loop to process incoming messages from the symbol ticker price data
Because we will be receiving and sending data we also need new data structs to represent that data. The following data structs should be added:
GenericMessage
: a simple message type for system messages and the like.SubscribeMessage
: this is a message we send to the server to indicate what stream topic we want to subscribe to.SymbolTicker
: represents the real-time price data for a cryptocurrency future.
Let’s open the models.rs
file and add the code:
#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct GenericMessage { pub id: String, #[serde(rename = "type")] pub msg_type: String, } #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct SubscribeMessage { pub id: u64, #[serde(rename = "type")] pub msg_type: String, pub topic: String, pub private_channel: bool, pub response: bool, } #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct SymbolTicker { pub sequence: u64, pub symbol: String, pub best_bid_price: String, pub best_bid_size: u64, pub best_ask_price: String, pub best_ask_size: u64, pub ts: u64, }
We will see how to use these data structures when we use them in the KuCoin Futures WebSocket Client.
Foundation for KuCoin Futures WebSocket Client
This is the last step in writing our “KuCoin Futures API with Rust” project. First, we will look at the basic FuturesWebSockets
struct and the new()
function implementation.
We’ll create a new file called futures_websocket.rs
:
use crate::futures_rest_client::FuturesRESTClient; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum SubscribeSymbols { All, Custom(Vec<String>), } pub struct FuturesWebSockets { exchange: String, topics: Vec<String>, subscribe_symbols: SubscribeSymbols, client: FuturesRESTClient, } impl FuturesWebSockets { pub fn new(topics: Vec<String>, subscribe_symbols: SubscribeSymbols) -> FuturesWebSockets { FuturesWebSockets { exchange: "kucoin-futures".to_string(), topics, subscribe_symbols, client: FuturesRESTClient::new("https://api-futures.kucoin.com/api/v1"), } } }
The enum we have created on lines 6-10 is used for configuring the symbols that should be subscribed to when the program runs. We have to option to configure all or a specific set of symbols.
Then, let’s go through the fields on the struct and what they mean:
exchange
: the name of the exchange we are connecting to. This can then be used in logging to distinguish activity from various exchanges if we have different ones to connect to. However, this tutorial only deals with one, KuCoin.topics
: which topics to subscribe to after connecting to the WebSocket. Making this aVec<String>
leaves the door open for multiple different streams. In the case of this tutorial, the scope is only for symbol ticker.subscribe_symbols
: configure specific symbol names to use when subscribing to topics, or use all available symbols.client
: this is an instance of the REST API client we wrote in an earlier section.
Run and eventloop outline
Next, we will write a run()
function which should be called to set things in motion, this function will call the event loop.
Add the following code to the impl FuturesWebSockets
code block below the new()
function:
pub async fn run(&mut self) -> Result<()> { let keep_running = AtomicBool::new(true); if let Err(e) = self.event_loop(&keep_running).await { error!("Error: {}", e); } info!("[{}] Loop stopped running.", &self.exchange); Ok(()) } async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> { info!("Start event loop..."); while running.load(Ordering::Relaxed) { } Ok(()) }
Establishing the connection
In this section, we will add a function that retrieves connection information from the REST API and then uses it to establish a WebSocket connection.
First, we should bring some new items into scope at the top of futures_websocket.rs
:
use crate::{futures_rest_client::FuturesRESTClient, models, websocket}; use anyhow::{bail, Result}; use log::{debug, error, info}; use serde::{Deserialize, Serialize}; use std::net::TcpStream; use std::sync::atomic::{AtomicBool, Ordering}; use tungstenite::handshake::client::Response; use tungstenite::protocol::WebSocket; use tungstenite::{stream::MaybeTlsStream, Message};
Next, let’s add a function called connect
for this to the impl FuturesWebSockets
code:
async fn connect(&mut self) -> Result<(WebSocket<MaybeTlsStream<TcpStream>>, Response)> { let channels: models::Channels = { debug!("Retrieving channels information from kucoin"); let result = self.client.get_public_channels().await; match result { Ok(channels) => channels, Err(e) => bail!("Error occurred: {}", e), } }; debug!( "Loop through {} server(s)", channels.data.instance_servers.len() ); let mut websocket_urls = Vec::new(); for server in channels.data.instance_servers { let connect_id = "_crypto-connector"; websocket_urls.push(format!( "{}?token={}&[connectId={}]", &server.endpoint, &channels.data.token, connect_id )); } if let Ok(con) = websocket::connect_wss(&self.exchange, websocket_urls) { return Ok(con); } bail!("Unable to connect."); }
First, we retrieve the channels information and connection token on lines 55-63. This is using the FuturesRESTClient
object.
After having successfully retrieved server information, we move on to creating a list of connection URL(s) by looping through the server data on lines 70-77. We append connection parameters token
and connectId
to the server URL. The connectId
can be anything we want. These pieces of information are required to establish a connection.
Finally, we call our connect_wss
function from the websocket
module.
Let’s call this function from our event_loop
function:
async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> { info!("Establishing connection..."); let mut socket = match self.connect().await { Ok(socket_ok) => socket_ok, Err(error) => { bail!("error: {}", error) } }; info!("Connected!"); info!("Start event loop..."); while running.load(Ordering::Relaxed) {} Ok(()) }
Receiving incoming WebSocket messages
We are slowly building out the event_loop
function and completing its functionality. Next up we will write code that deals with receiving messages from the WebSocket connection.
Once again we will update our event_loop
function:
async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> { info!("Establishing connection..."); let mut socket = match self.connect().await { Ok(socket_ok) => socket_ok, Err(error) => { bail!("error: {}", error) } }; info!("Connected!"); info!("Start event loop..."); while running.load(Ordering::Relaxed) { let message = match socket.0.read_message() { Ok(msg) => msg, Err(err) => { error!("Error: {}", err); info!("[{}] Reconnecting WebSocket due to error.", &self.exchange); socket = match self.connect().await { Ok(socket) => socket, Err(error) => { bail!("error: {}", error) } }; continue; } }; } socket.0.close(None)?; Ok(()) }
Since socket
is a tuple we have to access the 0th element to call read_message()
on it. This will read a message from the stream. We use a match
expression here because if the socket connection has disconnected for whatever reason we could get an error by trying to read from the stream.
If an error occurred we assume the connection stopped, in that case, we attempt to reconnect to the WebSocket server again. On lines 63-67. We then put a continue;
there to continue the loop and attempt to read from the stream again.
On line 73 we make sure to close the WebSocket connection in case the message receiving loop is stopped.
Determine KuCoin Futures WebSocket message types
There are a number of types of messages we can receive. What we are interested in are messages of type Message::Text
, basically string data. When we receive a message of type Text
we will call a handle_msg
function. However, we have not written the handle_msg
function yet, but will do so later.
Other types are for example Message::Pong(_)
and Message::Ping(_)
we do not have to take action on these. In the case of a ping message, KuCoin wants to check if we are still alive. The tungstenite crate automatically sends a pong message back, so we don’t have to.
Finally, we can get a Message::Close()
type of message. In such a case we simple print a message and then have our reconnect mechanism attempt to establish a new connection.
Let’s add this logic now to the “while loop” code block:
match message { Message::Text(msg) => { if let Err(e) = self.handle_msg(&msg, &mut socket.0).await { error!("Error on handling stream message: {}", e); continue; } } // We can ignore these message because tungstenite takes care of them for us. Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => (), Message::Close(e) => { error!("Disconnected {:?}", e); continue; } }
Next, we will write our message handler logic.
Handling incoming messages
To handle the incoming messages we will recognize (at least) two types of message events.
- Generic message event: these are general messages that the server sends to us. For example as reponse to one of our messages, or a “welcome message” when we first establish a connection.
- Book ticker event: real-time price data for cryptocurrency futures. These are the only ones we will subscribe to in this tutorial.
We will also have a Unknown
type as a placeholder for “the rest”.
Let’s add an enum at the top of futures_websocket.rs
now to distinguish these types of messages:
#[derive(Deserialize, Debug)] #[serde(untagged)] enum FuturesEvents { BookTickerEvent(models::SymbolTicker), GenericMessageEvent(models::GenericMessage), Unknown, }
Now we can add the handle_msg
function, below event_loop()
. It is a long function so we will go through each part after showing the full code here:
async fn handle_msg( &mut self, msg: &str, socket: &mut WebSocket<MaybeTlsStream<TcpStream>>, ) -> Result<()> { let mut value: serde_json::Value = serde_json::from_str(msg)?; loop { value = match value.get("data") { Some(data) => serde_json::from_str(&data.to_string())?, None => break, }; } if let Ok(events) = serde_json::from_value::<FuturesEvents>(value) { match events { FuturesEvents::BookTickerEvent(v) => { return self.handle_symbol_ticker_event(v).await; } FuturesEvents::GenericMessageEvent(v) => match v.msg_type.as_str() { "welcome" => { info!("Welcome message received, Subscribing to bookticker..."); self.subscribe_to_topics(socket).await; } _ => { info!("Generic message received: {}", &v.msg_type); } }, _ => { info!( "Generic event conversion not yet implemented for: FuturesEvents::{:?}", events ); return Ok(()); } }; } else { error!("Unknown message {}", msg); } Ok(()) }
handle_msg: signature and deserialization
First, the function signature and deserializing to a serde_json::Value
:
async fn handle_msg( &self, msg: &str, socket: &mut WebSocket<MaybeTlsStream<TcpStream>>, ) -> Result<()> { let mut value: serde_json::Value = serde_json::from_str(msg)?; loop { value = match value.get("data") { Some(data) => serde_json::from_str(&data.to_string())?, None => break, }; }
As parameters, we take a string slice msg
of course. This is the message data from the WebSocket. We also take a reference to the WebSocket connection object. As we want to be able to send messages over the WebSocket connection. For example, to subscribe to channels.
Next, we attempt to deserialize the data to a generic serde_json::Value
so that we can get the "data"
attribute if it is present. Often with KuCoin the data we are interested in will be wrapped in a message that has some other attributes and this data
attribute.
We loop until there is no data
attribute and then continue on with that object. We could do this with a recursive call to handle_message
however recursive async functions are tricky at the moment.
handle_msg: handling event types
Let’s look at how we handle the different even types:
if let Ok(events) = serde_json::from_value::<FuturesEvents>(value) { match events { FuturesEvents::BookTickerEvent(v) => { return self.handle_symbol_ticker_event(v).await; } FuturesEvents::GenericMessageEvent(v) => match v.msg_type.as_str() { "welcome" => { info!("Welcome message received, Subscribing to bookticker..."); self.subscribe_to_topics(socket).await; } _ => { info!("Generic message received: {}", &v.msg_type); } }, _ => { info!( "Generic event conversion not yet implemented for: FuturesEvents::{:?}", events ); return Ok(()); } }; } else { error!("Unknown message {}", msg); }
First, on line 113 we try to convert the serde_json::Value
to a FuturesEvents
enum value. If successful we pass this to the match
expression that determines how to deal with each event type.
If the event is of type BookTickerEvent
we simply pass the data on to a function we will write later: handle_symbol_ticker_event
.
If the event is of type GenericMessageEvent
we check the msg_type
attribute to see if it is a welcome
message. If it is a welcome
message this means we have just successfully connected to the WebSocket server. This is a good time to subscribe to topics (channels). So we call our subscribe_to_topics
function here passing the socket object.
In any other case, we simply log a generic text showing some information about the message we received.
Subscribing to WebSocket channels
In this section, we will write the logic for subscribing to channels. To subscribe to a channel we have to send a message through the WebSocket connection specifying the relevant details. In this case, we want to subscribe to the Symbol Ticker channel. According to the documentation the subscribe message should look like this:
{ "id": 1545910660740, "type": "subscribe", "topic": "/contractMarket/tickerV2:XBTUSDM", "response": true }
We already wrote a struct to represent this data earlier in models.rs
:
#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct SubscribeMessage { pub id: u64, #[serde(rename = "type")] pub msg_type: String, pub topic: String, pub private_channel: bool, pub response: bool, }
So we are all set. We only have to construct the appropriate topic string in the form of /contractMarket/tickerV2:{symbol}
.
Writing the subscribe_to_topics function
Let’s write the subscribe to topics function itself now:
async fn subscribe_to_topics(&self, socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) { let symbols: Vec<String> = match &self.subscribe_symbols { SubscribeSymbols::All => self.client.get_available_symbols().await.unwrap(), SubscribeSymbols::Custom(symbols) => symbols.clone(), }; let mut topics_clone = self.topics.clone(); let mut current_topic = topics_clone.pop(); let mut unique_id: u64 = 0; while let Some(topic) = current_topic.clone() { for symbol in &symbols { if socket.can_write() { let subscribe_topic = format!("{}{}", topic, symbol); info!( "[{}] Subscribing to topic with symbol: {}", &self.exchange, &subscribe_topic ); unique_id += 1; let msg = models::SubscribeMessage { id: unique_id, msg_type: "subscribe".to_string(), topic: subscribe_topic, private_channel: false, response: false, }; let json = serde_json::to_string(&msg).unwrap(); let message = Message::Text(json); match socket.write_message(message) { Ok(_) => { std::thread::sleep(std::time::Duration::from_millis(100)); continue; } Err(e) => { error!("Error occurred for symbol: {}", symbol); error!("Error: {}", e); continue; } } } else { error!("Cannot write to socket."); } } current_topic = topics_clone.pop(); } }
On lines 142-145 we get the list of symbols we should use to subscribe to the channels. If it was configured to be SubscribeSymbols::All
we use the client.get_available_symbols()
function to get all available symbols from the REST API Client. Otherwise, we use the specified custom list.
We create a clone from the topics
list so we have a list we can consume in our loop using pop()
. We have to deal with a situation where we can’t write to the WebSocket.
For each topic in the list of topics, we run through the list of symbols as indicated by lines 149 and 150.
On lines 157-163 we construct the appropriate message (SubscribeMessage
). The value for the id
field can be anything so we set it to 1. The channel is public so we set private_channel
to false
.
We serialize it to a String
on line 164 and then turn it into a Message::Text
enum value.
Then we send the message by calling socket.write_message
. If this action succeeds and we get an Ok
response, we wait 100 milliseconds before moving on to the next symbol. This is a simple mechanism to not exceed the maximum number of messages we can send in a short period of time. Otherwise, we might get disconnected.
Finally, on line 189 we pop the topic off the list so we can move on to the next topic if there is any. In this tutorial, we just have one topic: /contractMarket/tickerV2:{symbol}
.
Handling symbol ticker messages
In this section, we will write the function for handling the symbol ticker messages. We are just going to output the message data to the terminal/commandline.
Let’s add the following code below our handle_msg
function:
async fn handle_symbol_ticker_event(&self, data: models::SymbolTicker) -> Result<()> { info!("[{}] {:?}", self.exchange, data); // add your own logic here Ok(()) }
As mentioned there is not much to this function. We simply take in a models::SymbolTicker
object and log it to the logger. Then return a Ok(())
result.
Running the completed “KuCoin Futures API with Rust” project
With this final piece completed, we can call the run()
function from a FuturesWebsockets
object and our program will connect to the KuCoin Futures WebSocket server and start receiving data.
Let’s update main.rs
to configure and start the WebSocket connection and processing:
use futures_websocket::{FuturesWebSockets, SubscribeSymbols}; use log::info; use log4rs; mod futures_rest_client; mod futures_websocket; mod models; mod rest_client; mod websocket; #[tokio::main] async fn main() { log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found."); info!("We now have nice logging!"); let mut kucoin_futures = FuturesWebSockets::new( vec!["/contractMarket/tickerV2:".to_string()], SubscribeSymbols::All, ); kucoin_futures.run().await.unwrap(); }
When we run the program now we should see output like this:
2022-03-12 21:32:21 - INFO: We now have nice logging! 2022-03-12 21:32:21 - INFO: Establishing connection... 2022-03-12 21:32:21 - DEBUG: Retrieving channels information from kucoin 2022-03-12 21:32:21 - DEBUG: starting new connection: https://api-futures.kucoin.com/ 2022-03-12 21:32:22 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public 2022-03-12 21:32:22 - DEBUG: Loop through 1 server(s) 2022-03-12 21:32:22 - INFO: [kucoin-futures] connecting to wss://ws-api.kucoin.com/endpoint?token=2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ13U2qwvLH7icgwf791yIZ84B1q6Ubz4LNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3qtdiMSJwTc6nKmLIVneCM=.LslWAO7jiWhHUgLNlp8Itw==&[connectId=_crypto-connector] (try 0) 2022-03-12 21:32:22 - DEBUG: Trying to contact wss://ws-api.kucoin.com/endpoint?token=2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ13U2qwvLH7icgwf791yIZ84B1q6Ubz4LNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3qtdiMSJwTc6nKmLIVneCM=.LslWAO7jiWhHUgLNlp8Itw==&[connectId=_crypto-connector] at [2600:9000:2104:6200:2:bc38:d480:93a1]:443... 2022-03-12 21:32:23 - DEBUG: Client handshake done. 2022-03-12 21:32:23 - INFO: Connected! 2022-03-12 21:32:23 - INFO: Start event loop... 2022-03-12 21:32:23 - INFO: Welcome message received, Subscribing to bookticker... 2022-03-12 21:32:23 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/contracts/active 2022-03-12 21:32:24 - DEBUG: Found 101 available symbols. 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:XBTUSDTM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:XBTUSDM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:ETHUSDTM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:BCHUSDTM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:BSVUSDTM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:LINKUSDTM 2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:UNIUSDTM .... 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638492847965, symbol: "DENTUSDTM", best_bid_price: "0.002339", best_bid_size: 15661, best_ask_price: "0.002341", best_ask_size: 900, ts: 1647120794690064061 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638497113893, symbol: "LINKUSDTM", best_bid_price: "13.321", best_bid_size: 3792, best_ask_price: "13.323", best_ask_size: 5554, ts: 1647120794695992027 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1643122241068, symbol: "ETHUSDTM", best_bid_price: "2590.2", best_bid_size: 8169, best_ask_price: "2590.25", best_ask_size: 4440, ts: 1647120794694203043 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638493063154, symbol: "BCHUSDTM", best_bid_price: "290.7", best_bid_size: 2583, best_ask_price: "290.85", best_ask_size: 4267, ts: 1647120794697201559 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638494728461, symbol: "XRPUSDTM", best_bid_price: "0.7919", best_bid_size: 6413, best_ask_price: "0.792", best_ask_size: 3802, ts: 1647120794701234872 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638492861551, symbol: "TRXUSDTM", best_bid_price: "0.05971", best_bid_size: 201, best_ask_price: "0.05972", best_ask_size: 414, ts: 1647120794700317169 } 2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638493570747, symbol: "ZECUSDTM", best_bid_price: "153.12", best_bid_size: 3052, best_ask_price: "153.16", best_ask_size: 4889, ts: 1647120794706209324 }
Conclusion
We have created a nice foundation for an app that can make use of real-time cryptocurrency futures price data from KuCoin. Over the course of the tutorial, we have learned how to get information from the REST API and how to connect to the KuCoin Futures WebSocket server. Finally, we learned how to subscribe to topics or channels to receive data messages.
The full project can be found on my GitHub: here.
Please follow me on Twitter to get notified on new Rust programming and cryptocurrency related articles:
Follow @tmdev82