Kucoin API with Rust how to get symbol ticker data

In this tutorial, we are going to look at how to get cryptocurrency symbol ticker data from the KuCoin WebSocket Futures API with Rust. We will be using the public WebSocket API and we will connect to it using Rust and the Tungstenite crate.

The Symbol ticker data stream sends real-time price data for cryptocurrency futures symbols. At the end of this tutorial we will have a program that can:

  • connect to the KuCoin cryptocurrency futures WebSocket API.
  • retrieve all available cryptocurrency futures symbols from the REST API.
  • subscribe to streams for all the cryptocurrency futures symbols.
  • process incoming WebSocket messages about cryptocurrency futures price data.

This project will be a good basis for cryptocurrency real-time price visualization or a trading bot for example.

The repository with the full project code can be found on my GitHub: here.

The KuCoin futures API documentation for the symbol ticker data can be found here.

I have an article specifically about using WebSockets with Rust: Rust Warp WebSocket server: learn how to now.

I also have a similar article for the Binance API: Easily connect to Binance WebSocket streams with Rust.

Prerequisites

Some Rust programming knowledge and knowledge working with WebSockets are required to follow this tutorial.

KuCoin Futures API with Rust project architecture

Before we get started with writing code let’s first briefly look at the structure and components of the project we are going to build.

Program flow

As mentioned in the introduction our program will be able to connect to the KuCoin cryptocurrency futures API and receive futures price data. We are going to subscribe to the symbol ticker streams specifically. These streams give live updates on prices for any symbol channel we subscribe to. In this tutorial, we will not do anything with the incoming data, except for printing it to the terminal.

According to the KuCoin API documentation, these are the steps we have to take to connect to the WebSocket API and start receiving cryptocurrency futures price data:

  1. Get list of servers and connection token from REST API endpoint: https://api-futures.kucoin.com/api/v1/api/v1/bullet-public.
  2. Use server URL(s) and token to connect to the Futres WebSocket API.
  3. If successful receive a welcome message.
  4. Subscribe to desired channels. In this case: /contractMarket/tickerV2:{symbol}
  5. We can retrieve the symbols we can subscribe to from the Futures REST API: https://api-futures.kucoin.com/api/v1/contracts/active.

This flow looks like this:

KuCoin Futures API  with Rust connection flow diagram.

Project components

Next, let’s get a general idea of the components/modules we will write for our “KuCoin Futures API connect with Rust” project. In broad terms we need the following components:

  • HTTP Client: we have to retrieve information from the REST API.
  • KuCoin Futures REST API Client: this builds on the HTTP client and has knowledge about the KuCoin Fututres REST API endpoints.
  • KuCoin Futures WebSocket Client: can connect to the KuCoin Futures WebSocket server, receive and process incoming messages and data.
  • Data structure models: for deserializing incoming data to make it easier to work with.

KuCoin Futures API with Rust project setup

We start as always by creating a project using cargo: cargo new rust-kucoin-futures-symbol-ticker.

Dependencies

Let’s look at the dependencies we are going to add to the project’s Cargo.toml:

  • anyhow: Flexible concrete Error type built on std::error::Error.
  • log: A lightweight logging facade for Rust.
  • log4rs: A highly configurable multi-output logging implementation for the `log` facade.
  • reqwest: higher level HTTP client library.
  • serde: A generic serialization/deserialization framework.
  • serde_json: A JSON serialization file format.
  • tokio: An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
  • tungstenite: Lightweight stream-based WebSocket implementation.
  • url: URL library for Rust, based on the WHATWG URL Standard.

Some of these are not strictly required to complete the project. For example, log and log4rs, and anyhow. However, these crates make development a lot more comfortable.

Logging

Speaking of log4rs, let’s get the configuration of the logging out of the way while we are at it. Let’s add a file called logconfig.yaml to the root directory of our project:

appenders:
  stdout_logger:
    kind: console
    encoder:
      pattern: "{h({d(%Y-%m-%d %H:%M:%S)(utc)} - {l}: {m}{n})}"
root:
  level: debug
  appenders:
    - stdout_logger

This will configure log output to stdout showing a timestamp, the log level, and the log message. For more information on how log4rs can be used please check out my tutorial on it: Basic how to log to a file in Rust with log4rs.

Then in our main.rs file we have to load the configuration file:

use log::{info};
use log4rs;

fn main() {
    log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found.");
    info!("We now have nice logging!");
}

The log crate allows us to use macros like the one we see here with info. There is also error, debug, and trace.

Generic REST API Client

In this section, we are going to build the REST API Client that we will use to get information to be able to connect to the WebSocket server. Other than that this client will also retrieve information about futures symbols so that we can subscribe to the real-time price data channels.

For now, this client needs to be able to send HTTP GET and POST requests. Because we are aiming to build a foundation for future work let’s make it generic.

Basic Client

Let’s create a file called rest_client.rs:

pub struct Client {
    host: String,
    inner_client: reqwest::Client,
}

impl Client {
    pub fn new(host: String) -> Self {
        Client {
            host,
            inner_client: reqwest::Client::builder()
                .pool_idle_timeout(None)
                .build()
                .unwrap(),
        }
    }
}

Here we have the basic setup for our generic REST API client. A struct with fields for the host URL (host) and for an instance of the reqwest Client object. Because we want to make use of connection pooling we store an instance of the Client object to reuse for each request.

On lines 6 through 16 we have impl Clien block that implements a new function for our Client struct.

Next, we will add several functions for sending requests and handling the responses.

Bringing items into scope

Let’s make our lives easier by bringing a bunch of items into scope that we will be using:

use anyhow::{bail, Result};
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, USER_AGENT};
use reqwest::{Response, StatusCode};
use serde::de::DeserializeOwned;
use serde::Deserialize;
use std::fmt;

First, anyhow for error types. Then the reqwest related items we will talk more about later. Finally, we add serde::de:DeserializeOwned which we will use in return values to signify we are returning ownable data with our functions.

Response handler function

When we send requests to the KuCoin REST API we expect to get a response in the form of some data. However, there are a number of generic responses we can receive that are failures. For example, the BAD REQUEST response or SERVICE UNAVAILABLE and the like.

To deal with a number of these responses Let’s write a handler function. Our response handler function will be called by functions sending requests to handle the message that comes back from the REST API server.

Let’s add the handler function to the impl Client code block now:

    async fn handler<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
        match response.status() {
            StatusCode::OK => Ok(response.json::<T>().await?),
            StatusCode::INTERNAL_SERVER_ERROR => {
                bail!("Internal Server Error");
            }
            StatusCode::SERVICE_UNAVAILABLE => {
                bail!("Service Unavailable");
            }
            StatusCode::UNAUTHORIZED => {
                bail!("Unauthorized");
            }
            StatusCode::BAD_REQUEST => {
                let error: ContentError = response.json().await?;
                bail!(error)
            }
            s => {
                bail!(format!("Received response: {:?}", s));
            }
        }
    }

Here we take in a Response object and pattern match it with match, mainly checking what StatusCode we received.

If we receive an Ok then we deserialize the response JSON and return it.

We check some specific error codes and return error messages using bail! which is a macro that helps with returning errors.

For bad request responses, on lines 48-51, we deserialize the response JSON into a ContentError data structure we have not written yet. So let’s do that now. Let’s add it to rest_client.rs, somewhere below the impl Client code block:

#[derive(Debug, Deserialize)]
pub struct ContentError {
    pub code: i16,
    pub msg: String,
}

impl fmt::Display for ContentError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "code: {} \nmsg: {}", self.code, self.msg) // user-facing output
    }
}

We implement a Display formatter for good measure. This simply displays the error code and a message string.

If we go back to the match expression, at the end we have a generic catch-all arm:

            s => {
                bail!(format!("Received response: {:?}", s));
            }

This will catch any other status code and simply put the code in an error message.

GET function

Now let’s add the function for making GET requests. A get request sends no data, it just requests it. However, it can send request or query parameters. For example, these parameters can be used to filter the requested data.

Add this code to the impl Client code block:

    pub async fn get<T: DeserializeOwned>(
        &self,
        endpoint: &str,
        request: Option<String>,
    ) -> Result<T> {
        let mut url: String = format!("{}{}", self.host, endpoint);
        if let Some(request) = request {
            if !request.is_empty() {
                url.push_str(format!("?{}", request).as_str());
            }
        }

        let client = &self.inner_client;
        let response = client.get(url.as_str()).send().await?;

        self.handler(response).await
    }

This is a generic function that takes an endpoint string slice (&str) and optionally a string containing request options. We return an object of type T that implements DeserializeOwned. In short, this means we will be returning deserializable data structs objects that are owned, not borrowed.

On line 51-56 we construct the target URL from the given endpoint and our internal host address. The request params are added if there are any.

Then we get a reference to the client instance and call the get functions followed by send to send the request. Since this is an asynchronous request we have to add an await here.

When we get a response we pass it off to our handler function and return the result from it.

POST function

For the POST requests, we should construct a header containing our user agent name.

Let’s add a function called build_headers to the impl Client code block under the get function:

    fn build_headers(&self) -> Result<HeaderMap> {
        let mut custom_headers = HeaderMap::new();

        custom_headers.insert(USER_AGENT, HeaderValue::from_static("crypto-connector"));

        Ok(custom_headers)
    }

The user agent can be any name. This function is not much right now. However, in the future, we can expand it to add an api-key header for example. Unfortunately adding an api-key header for signed requests is outside the scope of this tutorial.

Let’s move on to write the function for POST requests. Add this code to the impl Client code block:

  pub async fn post<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
        let url: String = format!("{}{}", self.host, endpoint);

        let client = &self.inner_client;
        let response = client
            .post(url.as_str())
            .headers(self.build_headers(false)?)
            .send()
            .await?;

        self.handler(response).await
    }

This function is almost identical to our get function. Except for the fact that there is no parameter for request parameters. We are calling post instead of get, of course, and we have added a call to headers().

Note: we are not actually sending data with this request. While typical we would send data with a POST request. However, there are cases where it is not necessary to send data. In fact, the KuCoin Futures REST API has such a case.

Trying out our REST Client

In this section, we will quickly try out our new REST Client to see if we can get data from the KuCoin REST API with our Rust code.

Let’s open the main.rs file and add the following code:

use log::{info};
use log4rs;

mod rest_client;

#[tokio::main]
async fn main() {
    log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found.");
    info!("We now have nice logging!");

    let client = rest_client::Client::new("https://api-futures.kucoin.com/api/v1".to_string());
    let result: serde_json::Value = client.post("/bullet-public").await.unwrap();

    info!("KuCoin bullet-public result: {:?}", result);
}

Here we create an instance of our REST Client on line 11 and pass the KuCoin futures REST API host address as a parameter. Then on the next line, we call our post function with the endpoint for retrieving the WebSocket endpoint and connection-token. See KuCoin API documentation for more information.

We deserialize the data to a generic serde_json::Value for now and print it on line 14.

If we run the program the result should look like this:

    Finished dev [unoptimized + debuginfo] target(s) in 25.40s
     Running `target/debug/rust-kucoin-futures-websocket`
2022-03-09 21:23:35 - INFO: We now have nice logging!
2022-03-09 21:23:35 - DEBUG: starting new connection: https://api-futures.kucoin.com/
2022-03-09 21:23:35 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public
2022-03-09 21:23:35 - INFO: KuCoin bullet-public result: Object({"code": String("200000"), "data": Object({"instanceServers": Array([Object({"encrypt": Bool(true), "endpoint": String("wss://ws-api.kucoin.com/endpoint"), "pingInterval": Number(18000), "pingTimeout": Number(10000), "protocol": String("websocket")})]), "token": String("2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJxvqRwQb3of_3eE5sq9oXsYDL0EWbh-iz9iYB9J6i9GjsxUuhPw3BlrzazF6ghq4L4nXitq7pjaun3DzuujPaag=.aRaeWiVwqo8mLyml8lNM9w==")})})

We also have to start writing some data structures to process the data we will receive.

REST API Data models

Before we start writing the Futures REST API Client we are going to write the structs that help us deserialize the data. We can see what the data looks like in the KuCoin Futures API documentation and base our data models on that.

So let’s add a file called models.rs for out data models.

Public Channels data

First the data structs for getting the server data and connection-token. In the documentation, this is called “the public channels”. The JSON data looks like this:

{
    "code": "200000",
    "data": {
        "instanceServers": [
            {
                "pingInterval": 50000,
                "endpoint": "wss://push.kucoin.com/endpoint",
                "protocol": "websocket",
                "encrypt": true,
                "pingTimeout": 10000
            }
        ],
        "token": "vYNlCtbz4XNJ1QncwWilJnBtmmfe4geLQDUA62kKJsDChc6I4bRDQc73JfIrlFaVYIAE0Gv2--MROnLAgjVsWkcDq_MuG7qV7EktfCEIphiqnlfpQn4Ybg==.IoORVxR2LmKV7_maOR9xOg=="
    }
  }

This data can be split up into 3 separate structs:

use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Channels {
    pub code: String,
    pub data: ChannelsData,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ChannelsData {
    pub instance_servers: Vec<InstanceServer>,
    pub token: String,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct InstanceServer {
    pub ping_interval: i64,
    pub endpoint: String,
    pub protocol: String,
    pub encrypt: bool,
    pub ping_timeout: i64,
}

Most of it is pretty self-explanatory. Because the KuCoin API writes names in camel case and Rust prefers snake case, we annotate the structs with #[serde(rename_all = "camelCase")]. Doing this will rename the fields automatically.

Open contracts data

Next, we also want to retrieve all available symbols for cryptocurrency futures contracts. That is because we want to subscribe to WebSocket channels for all symbol ticker data (real-time price data). Therefore, let’s also add data structs for the open contracts list. The data is quite long but we can have Rust only deserialize the fields we are interested in.

Add the models for open contracts:

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OpenContractData {
    pub symbol: String,
    pub root_symbol: String,
    #[serde(rename = "type")]
    pub contract_type: String,
    pub base_currency: String,
    pub quote_currency: String,
    pub settle_currency: String,
    pub max_order_qty: f64,
    pub max_price: f64,
    pub lot_size: f64,
    pub tick_size: f64,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OpenContracts {
    pub code: Option<String>,
    pub data: Vec<OpenContractData>,
}

For the scope of this project we only really need the symbol field. However, if we wanted to execute trades we would need to know things like valid price step increments (tick_size), order size step increments (lot_size), what currency we can buy this with (quote_currency), etc.

KuCoin Futures REST API Client

In this section, we will build a client that has knowledge of the KuCoin business logic/endpoints. This will wrap the generic client and make getting information from the KuCoin Futures REST API simpler in other parts of the code.

Let’s add a new file called futures_rest_client.rs to our project. And let’s start with a basic struct and new() function implementation:

use crate::rest_client::Client;
use anyhow::{bail, Result};
use log::{debug, error};

pub struct FuturesRESTClient {
    client: Client,
}

impl FuturesRESTClient {
    pub fn new(hostname: &str) -> FuturesRESTClient {
        FuturesRESTClient {
            client: Client::new(hostname.to_string()),
        }
    }
}

This struct simply has the generic REST API Client as a field. What it adds on top of this is business logic in the functions. Let’s write those now.

Get public channels function

First, we bring the models we want to use into scope:

use crate::{rest_client::Client, models::{Channels, OpenContracts}};
use anyhow::{bail, Result};
use log::{debug, error};

pub struct FuturesRESTClient {
    client: Client,
}

Then let’s add the function get_public_channels to the impl FuturesRESTClient code block. It will simply call the post() function on the client with the appropriate endpoint parameter:

    pub async fn get_public_channels(&self) -> Result<Channels> {
        let result = self.client.post("/bullet-public").await;

        match result {
            Ok(channels) => Ok(channels),
            Err(e) => bail!(format!("Error retrieving channels: {:?}", e)),
        }
    }

As mentioned the /bullet-public endpoint returns data containing a server URL and a connection-token. This is represented by our Channels struct. Our request should be sent as a POST request.

Get open contracts and available symbols

Next, we need functions for retrieving the Futures contracts symbols data as these are used to subscribe to WebSocket streams.

Let’s add the following code to the impl FuturesRESTClient code block:

    pub async fn get_open_contracts(&self) -> Result<OpenContracts> {
        let result = self.client.get("/contracts/active", None).await;

        match result {
            Ok(open_contracts) => Ok(open_contracts),
            Err(e) => bail!(format!("Error retrieving contracts: {:?}", e)),
        }
    }

    pub async fn get_available_symbols(&self) -> Result<Vec<String>> {
        let contracts = match self.get_open_contracts().await {
            Ok(contracts) => contracts,
            Err(e) => {
                bail!(e);
            }
        };

        let symbols_list: Vec<String> = contracts.data.iter().map(|f| f.symbol.clone()).collect();
        debug!("Found {} available symbols.", symbols_list.len());

        Ok(symbols_list)
    }

Our function get_open_contracts on lines 28-35 for getting the OpenContracts data is similar to the pattern of the get_public_channels function. Only this time we are using get to make a request.

Finally, we add a helper function get_available_symbols for extracting a list of symbol names as a Vec<String> from the OpenContracts data. This will make the code that needs this information more readable.

Updating the example in main

Now we can update the example in main.rs to be much cleaner:

use log::{info};
use log4rs;

mod rest_client;
mod futures_rest_client;
mod models;

#[tokio::main]
async fn main() {
    log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found.");
    info!("We now have nice logging!");

    let client = futures_rest_client::FuturesRESTClient::new("https://api-futures.kucoin.com/api/v1");
    let result = client.get_public_channels().await.unwrap();

    info!("KuCoin bullet-public result: {:?}", result);
}

Now we are getting somewhere with our “KuCoin Futures API connector with Rust project”. If we run the program the output looks like this:

2022-03-10 21:49:57 - INFO: We now have nice logging!
2022-03-10 21:49:57 - DEBUG: starting new connection: https://api-futures.kucoin.com/
2022-03-10 21:49:57 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public
2022-03-10 21:49:57 - INFO: KuCoin bullet-public result: Channels { code: "200000", data: ChannelsData { instance_servers: [InstanceServer { ping_interval: 18000, endpoint: "wss://ws-api.kucoin.com/endpoint", protocol: "websocket", encrypt: true, ping_timeout: 10000 }], token: "2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJwChJVhr1aQcVr0g_mKH9Hyj7Z-j4YamgtiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3tcDDWbtK5Qif44T6DY8K8=.gpZKf7rAfzZtE4y3scBd8Q==" } }

Next, we are going to build the WebSocket connection logic.

WebSocket connector implementation

In this section, we are going to write some logic for connecting to a WebSocket server given a list of URLs. Typically we will only have one URL that can be used for a connection, but there are cases where there can be multiple different URLs.

Let’s add a file called websocket.rs to put in our generic WebSocket connection logic:

use anyhow::{bail, Result};
use log::{error, info};
use std::net::TcpStream;
use tungstenite::handshake::client::Response;
use tungstenite::protocol::WebSocket;
use tungstenite::stream::MaybeTlsStream;
use url::Url;

pub fn connect_wss(
    exchange: &str,
    websocket_urls: Vec<String>,
) -> Result<(WebSocket<MaybeTlsStream<TcpStream>>, Response)> {
    let max_retry = 5;
    for i in 0..max_retry {
        for wss in &websocket_urls {
            info!("[{}] connecting to {} (try {})", exchange, wss, i);
            let url = Url::parse(wss)?;
        
            match tungstenite::connect(url) {
                Ok(answer) => {
                    return Ok(answer);
                }
                Err(e) => error!("Error during handshake {}", e),
            }
        }
    }

    bail!(format!("Max connection retry reached"));
}

We have just one function here that uses tungstenite to connect to an URL and return a WebSocket<MaybeTlsStream<TcpStream>>. This object represents a local to remote stream object over TCP that might be protected with TLS. In our case, it will be a secure TLS connection.

Then we have two loops. One retry loop that runs a maximum number of times to retry connecting if a connection fails at first. And then the inner loop that cycles through all the available WebSocket URLs.

We attempt to connect to the URL on line 19 with tungstenite::connect and return out of the function if successful.

Building the KuCoin Futures WebSocket Client

In this section, we will write logic code that brings all the pieces we have built so far together.

As mentioned in an earlier section, this logic will:

  • Retrieve the WebSocket server URL(s) and connection token
  • Open a WebSocket connection
  • Retrieve available cryptocurrency futures contract symbols
  • Subscribe to all the related streams for the symbol ticker price data
  • Run an infinite loop to process incoming messages from the symbol ticker price data

WebSocket related data models

Because we will be receiving and sending data we also need new data structs to represent that data. The following data structs should be added:

  • GenericMessage: a simple message type for system messages and the like.
  • SubscribeMessage: this is a message we send to the server to indicate what stream topic we want to subscribe to.
  • SymbolTicker: represents the real-time price data for a cryptocurrency future.

Let’s open the models.rs file and add the code:

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GenericMessage {
    pub id: String,
    #[serde(rename = "type")]
    pub msg_type: String,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeMessage {
    pub id: u64,
    #[serde(rename = "type")]
    pub msg_type: String,
    pub topic: String,
    pub private_channel: bool,
    pub response: bool,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SymbolTicker {
    pub sequence: u64,
    pub symbol: String,
    pub best_bid_price: String,
    pub best_bid_size: u64,
    pub best_ask_price: String,
    pub best_ask_size: u64,
    pub ts: u64,
}

We will see how to use these data structures when we use them in the KuCoin Futures WebSocket Client.

Foundation for KuCoin Futures WebSocket Client

This is the last step in writing our “KuCoin Futures API with Rust” project. First, we will look at the basic FuturesWebSockets struct and the new() function implementation.

We’ll create a new file called futures_websocket.rs:

use crate::futures_rest_client::FuturesRESTClient;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum SubscribeSymbols {
    All,
    Custom(Vec<String>),
}

pub struct FuturesWebSockets {
    exchange: String,
    topics: Vec<String>,              
    subscribe_symbols: SubscribeSymbols,
    client: FuturesRESTClient,
}

impl FuturesWebSockets {
    pub fn new(topics: Vec<String>, subscribe_symbols: SubscribeSymbols) -> FuturesWebSockets {
        FuturesWebSockets {
            exchange: "kucoin-futures".to_string(),
            topics,
            subscribe_symbols,
            client: FuturesRESTClient::new("https://api-futures.kucoin.com/api/v1"),
        }
    }
}

The enum we have created on lines 6-10 is used for configuring the symbols that should be subscribed to when the program runs. We have to option to configure all or a specific set of symbols.

Then, let’s go through the fields on the struct and what they mean:

  • exchange: the name of the exchange we are connecting to. This can then be used in logging to distinguish activity from various exchanges if we have different ones to connect to. However, this tutorial only deals with one, KuCoin.
  • topics: which topics to subscribe to after connecting to the WebSocket. Making this a Vec<String> leaves the door open for multiple different streams. In the case of this tutorial, the scope is only for symbol ticker.
  • subscribe_symbols: configure specific symbol names to use when subscribing to topics, or use all available symbols.
  • client: this is an instance of the REST API client we wrote in an earlier section.

Run and eventloop outline

Next, we will write a run() function which should be called to set things in motion, this function will call the event loop.

Add the following code to the impl FuturesWebSockets code block below the new() function:

    pub async fn run(&mut self) -> Result<()> {
        let keep_running = AtomicBool::new(true);

        if let Err(e) = self.event_loop(&keep_running).await {
            error!("Error: {}", e);
        }
        info!("[{}] Loop stopped running.", &self.exchange);

        Ok(())
    }

    async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
        info!("Start event loop...");

        while running.load(Ordering::Relaxed) {
        }
       
        Ok(())
    }

Establishing the connection

In this section, we will add a function that retrieves connection information from the REST API and then uses it to establish a WebSocket connection.

First, we should bring some new items into scope at the top of futures_websocket.rs:

use crate::{futures_rest_client::FuturesRESTClient, models, websocket};
use anyhow::{bail, Result};
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::net::TcpStream;
use std::sync::atomic::{AtomicBool, Ordering};
use tungstenite::handshake::client::Response;
use tungstenite::protocol::WebSocket;
use tungstenite::{stream::MaybeTlsStream, Message};

Next, let’s add a function called connect for this to the impl FuturesWebSockets code:

    async fn connect(&mut self) -> Result<(WebSocket<MaybeTlsStream<TcpStream>>, Response)> {
        let channels: models::Channels = {
            debug!("Retrieving channels information from kucoin");
            let result = self.client.get_public_channels().await;

            match result {
                Ok(channels) => channels,
                Err(e) => bail!("Error occurred: {}", e),
            }
        };

        debug!(
            "Loop through {} server(s)",
            channels.data.instance_servers.len()
        );

        let mut websocket_urls = Vec::new();
        for server in channels.data.instance_servers {
            let connect_id = "_crypto-connector";
            websocket_urls.push(format!(
                "{}?token={}&[connectId={}]",
                &server.endpoint, &channels.data.token, connect_id
            ));
        }

        if let Ok(con) = websocket::connect_wss(&self.exchange, websocket_urls) {
            return Ok(con);
        }

        bail!("Unable to connect.");
    }

First, we retrieve the channels information and connection token on lines 55-63. This is using the FuturesRESTClient object.

After having successfully retrieved server information, we move on to creating a list of connection URL(s) by looping through the server data on lines 70-77. We append connection parameters token and connectId to the server URL. The connectId can be anything we want. These pieces of information are required to establish a connection.

Finally, we call our connect_wss function from the websocket module.

Let’s call this function from our event_loop function:

    async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
        info!("Establishing connection...");
        let mut socket = match self.connect().await {
            Ok(socket_ok) => socket_ok,
            Err(error) => {
                bail!("error: {}", error)
            }
        };
        info!("Connected!");
        
        info!("Start event loop...");
        while running.load(Ordering::Relaxed) {}

        Ok(())
    }

Receiving incoming WebSocket messages

We are slowly building out the event_loop function and completing its functionality. Next up we will write code that deals with receiving messages from the WebSocket connection.

Once again we will update our event_loop function:

    async fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
        info!("Establishing connection...");
        let mut socket = match self.connect().await {
            Ok(socket_ok) => socket_ok,
            Err(error) => {
                bail!("error: {}", error)
            }
        };
        info!("Connected!");

        info!("Start event loop...");
        while running.load(Ordering::Relaxed) {
            let message = match socket.0.read_message() {
                Ok(msg) => msg,
                Err(err) => {
                    error!("Error: {}", err);
                    info!("[{}] Reconnecting WebSocket due to error.", &self.exchange);
                    socket = match self.connect().await {
                        Ok(socket) => socket,
                        Err(error) => {
                            bail!("error: {}", error)
                        }
                    };
                    continue;
                }
            };
        }
        socket.0.close(None)?;
        Ok(())
    }

Since socket is a tuple we have to access the 0th element to call read_message() on it. This will read a message from the stream. We use a match expression here because if the socket connection has disconnected for whatever reason we could get an error by trying to read from the stream.

If an error occurred we assume the connection stopped, in that case, we attempt to reconnect to the WebSocket server again. On lines 63-67. We then put a continue; there to continue the loop and attempt to read from the stream again.

On line 73 we make sure to close the WebSocket connection in case the message receiving loop is stopped.

Determine KuCoin Futures WebSocket message types

There are a number of types of messages we can receive. What we are interested in are messages of type Message::Text, basically string data. When we receive a message of type Text we will call a handle_msg function. However, we have not written the handle_msg function yet, but will do so later.

Other types are for example Message::Pong(_) and Message::Ping(_) we do not have to take action on these. In the case of a ping message, KuCoin wants to check if we are still alive. The tungstenite crate automatically sends a pong message back, so we don’t have to.

Finally, we can get a Message::Close() type of message. In such a case we simple print a message and then have our reconnect mechanism attempt to establish a new connection.

Let’s add this logic now to the “while loop” code block:

            match message {
                Message::Text(msg) => {
                    if let Err(e) = self.handle_msg(&msg, &mut socket.0).await {
                        error!("Error on handling stream message: {}", e);
                        continue;
                    }
                }
                // We can ignore these message because tungstenite takes care of them for us.
                Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => (),
                Message::Close(e) => {
                    error!("Disconnected {:?}", e);
                    continue;
                }
            }

Next, we will write our message handler logic.

Handling incoming messages

To handle the incoming messages we will recognize (at least) two types of message events.

  1. Generic message event: these are general messages that the server sends to us. For example as reponse to one of our messages, or a “welcome message” when we first establish a connection.
  2. Book ticker event: real-time price data for cryptocurrency futures. These are the only ones we will subscribe to in this tutorial.

We will also have a Unknown type as a placeholder for “the rest”.

Let’s add an enum at the top of futures_websocket.rs now to distinguish these types of messages:

#[derive(Deserialize, Debug)]
#[serde(untagged)]
enum FuturesEvents {
    BookTickerEvent(models::SymbolTicker),
    GenericMessageEvent(models::GenericMessage),
    Unknown,
}

Now we can add the handle_msg function, below event_loop(). It is a long function so we will go through each part after showing the full code here:

    async fn handle_msg(
        &mut self,
        msg: &str,
        socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
    ) -> Result<()> {
        let mut value: serde_json::Value = serde_json::from_str(msg)?;

        loop {
            value = match value.get("data") {
                Some(data) => serde_json::from_str(&data.to_string())?,
                None => break,
            };
        }

        if let Ok(events) = serde_json::from_value::<FuturesEvents>(value) {
            match events {
                FuturesEvents::BookTickerEvent(v) => {
                    return self.handle_symbol_ticker_event(v).await;
                }
                FuturesEvents::GenericMessageEvent(v) => match v.msg_type.as_str() {
                    "welcome" => {
                        info!("Welcome message received, Subscribing to bookticker...");
                        self.subscribe_to_topics(socket).await;
                    }
                    _ => {
                        info!("Generic message received: {}", &v.msg_type);
                    }
                },
                _ => {
                    info!(
                        "Generic event conversion not yet implemented for: FuturesEvents::{:?}",
                        events
                    );
                    return Ok(());
                }
            };
        } else {
            error!("Unknown message {}", msg);
        }
        Ok(())
    }

handle_msg: signature and deserialization

First, the function signature and deserializing to a serde_json::Value:

    async fn handle_msg(
        &self,
        msg: &str,
        socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
    ) -> Result<()> {
        let mut value: serde_json::Value = serde_json::from_str(msg)?;

        loop {
            value = match value.get("data") {
                Some(data) => serde_json::from_str(&data.to_string())?,
                None => break,
            };
        }

As parameters, we take a string slice msg of course. This is the message data from the WebSocket. We also take a reference to the WebSocket connection object. As we want to be able to send messages over the WebSocket connection. For example, to subscribe to channels.

Next, we attempt to deserialize the data to a generic serde_json::Value so that we can get the "data" attribute if it is present. Often with KuCoin the data we are interested in will be wrapped in a message that has some other attributes and this data attribute.

We loop until there is no data attribute and then continue on with that object. We could do this with a recursive call to handle_message however recursive async functions are tricky at the moment.

handle_msg: handling event types

Let’s look at how we handle the different even types:

        if let Ok(events) = serde_json::from_value::<FuturesEvents>(value) {
            match events {
                FuturesEvents::BookTickerEvent(v) => {
                    return self.handle_symbol_ticker_event(v).await;
                }
                FuturesEvents::GenericMessageEvent(v) => match v.msg_type.as_str() {
                    "welcome" => {
                        info!("Welcome message received, Subscribing to bookticker...");
                        self.subscribe_to_topics(socket).await;
                    }
                    _ => {
                        info!("Generic message received: {}", &v.msg_type);
                    }
                },
                _ => {
                    info!(
                        "Generic event conversion not yet implemented for: FuturesEvents::{:?}",
                        events
                    );
                    return Ok(());
                }
            };
        } else {
            error!("Unknown message {}", msg);
        }

First, on line 113 we try to convert the serde_json::Value to a FuturesEvents enum value. If successful we pass this to the match expression that determines how to deal with each event type.

If the event is of type BookTickerEvent we simply pass the data on to a function we will write later: handle_symbol_ticker_event.

If the event is of type GenericMessageEvent we check the msg_type attribute to see if it is a welcome message. If it is a welcome message this means we have just successfully connected to the WebSocket server. This is a good time to subscribe to topics (channels). So we call our subscribe_to_topics function here passing the socket object.

In any other case, we simply log a generic text showing some information about the message we received.

Subscribing to WebSocket channels

In this section, we will write the logic for subscribing to channels. To subscribe to a channel we have to send a message through the WebSocket connection specifying the relevant details. In this case, we want to subscribe to the Symbol Ticker channel. According to the documentation the subscribe message should look like this:

  {
    "id": 1545910660740,                          
    "type": "subscribe",
    "topic": "/contractMarket/tickerV2:XBTUSDM",
    "response": true                              
  }

We already wrote a struct to represent this data earlier in models.rs:

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeMessage {
    pub id: u64,
    #[serde(rename = "type")]
    pub msg_type: String,
    pub topic: String,
    pub private_channel: bool,
    pub response: bool,
}

So we are all set. We only have to construct the appropriate topic string in the form of /contractMarket/tickerV2:{symbol}.

Writing the subscribe_to_topics function

Let’s write the subscribe to topics function itself now:

    async fn subscribe_to_topics(&self, socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
        let symbols: Vec<String> = match &self.subscribe_symbols {
            SubscribeSymbols::All => self.client.get_available_symbols().await.unwrap(),
            SubscribeSymbols::Custom(symbols) => symbols.clone(),
        };

        let mut topics_clone = self.topics.clone();
        let mut current_topic = topics_clone.pop();
        let mut unique_id: u64 = 0;
        while let Some(topic) = current_topic.clone() {
            for symbol in &symbols {
                if socket.can_write() {
                    let subscribe_topic = format!("{}{}", topic, symbol);
                    info!(
                        "[{}] Subscribing to topic with symbol: {}",
                        &self.exchange, &subscribe_topic
                    );
                    unique_id += 1;
                    let msg = models::SubscribeMessage {
                        id: unique_id,
                        msg_type: "subscribe".to_string(),
                        topic: subscribe_topic,
                        private_channel: false,
                        response: false,
                    };
                    let json = serde_json::to_string(&msg).unwrap();
                    let message = Message::Text(json);
                    match socket.write_message(message) {
                        Ok(_) => {
                            std::thread::sleep(std::time::Duration::from_millis(100));
                            continue;
                        }
                        Err(e) => {
                            error!("Error occurred for symbol: {}", symbol);
                            error!("Error: {}", e);
                            continue;
                        }
                    }
                } else {
                    error!("Cannot write to socket.");
                }
            }
            current_topic = topics_clone.pop();
        }
    }

On lines 142-145 we get the list of symbols we should use to subscribe to the channels. If it was configured to be SubscribeSymbols::All we use the client.get_available_symbols() function to get all available symbols from the REST API Client. Otherwise, we use the specified custom list.

We create a clone from the topics list so we have a list we can consume in our loop using pop(). We have to deal with a situation where we can’t write to the WebSocket.

For each topic in the list of topics, we run through the list of symbols as indicated by lines 149 and 150.

On lines 157-163 we construct the appropriate message (SubscribeMessage). The value for the id field can be anything so we set it to 1. The channel is public so we set private_channel to false.

We serialize it to a String on line 164 and then turn it into a Message::Text enum value.

Then we send the message by calling socket.write_message. If this action succeeds and we get an Ok response, we wait 100 milliseconds before moving on to the next symbol. This is a simple mechanism to not exceed the maximum number of messages we can send in a short period of time. Otherwise, we might get disconnected.

Finally, on line 189 we pop the topic off the list so we can move on to the next topic if there is any. In this tutorial, we just have one topic: /contractMarket/tickerV2:{symbol}.

Handling symbol ticker messages

In this section, we will write the function for handling the symbol ticker messages. We are just going to output the message data to the terminal/commandline.

Let’s add the following code below our handle_msg function:

   async fn handle_symbol_ticker_event(&self, data: models::SymbolTicker) -> Result<()> {
        info!("[{}] {:?}", self.exchange, data);
        // add your own logic here
        Ok(())
    }

As mentioned there is not much to this function. We simply take in a models::SymbolTicker object and log it to the logger. Then return a Ok(()) result.

Running the completed “KuCoin Futures API with Rust” project

With this final piece completed, we can call the run() function from a FuturesWebsockets object and our program will connect to the KuCoin Futures WebSocket server and start receiving data.

Let’s update main.rs to configure and start the WebSocket connection and processing:

use futures_websocket::{FuturesWebSockets, SubscribeSymbols};
use log::info;
use log4rs;

mod futures_rest_client;
mod futures_websocket;
mod models;
mod rest_client;
mod websocket;

#[tokio::main]
async fn main() {
    log4rs::init_file("logconfig.yml", Default::default()).expect("Log config file not found.");
    info!("We now have nice logging!");

    let mut kucoin_futures = FuturesWebSockets::new(
        vec!["/contractMarket/tickerV2:".to_string()],
        SubscribeSymbols::All,
    );
    kucoin_futures.run().await.unwrap();
}

When we run the program now we should see output like this:

2022-03-12 21:32:21 - INFO: We now have nice logging!
2022-03-12 21:32:21 - INFO: Establishing connection...
2022-03-12 21:32:21 - DEBUG: Retrieving channels information from kucoin
2022-03-12 21:32:21 - DEBUG: starting new connection: https://api-futures.kucoin.com/
2022-03-12 21:32:22 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/bullet-public
2022-03-12 21:32:22 - DEBUG: Loop through 1 server(s)
2022-03-12 21:32:22 - INFO: [kucoin-futures] connecting to wss://ws-api.kucoin.com/endpoint?token=2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ13U2qwvLH7icgwf791yIZ84B1q6Ubz4LNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3qtdiMSJwTc6nKmLIVneCM=.LslWAO7jiWhHUgLNlp8Itw==&[connectId=_crypto-connector] (try 0)
2022-03-12 21:32:22 - DEBUG: Trying to contact wss://ws-api.kucoin.com/endpoint?token=2neAiuYvAU61ZDXANAGAsiL4-iAExhsBXZxftpOeh_55i3Ysy2q2LEsEWU64mdzUOPusi34M_wGoSf7iNyEWJ13U2qwvLH7icgwf791yIZ84B1q6Ubz4LNiYB9J6i9GjsxUuhPw3BlrzazF6ghq4L3qtdiMSJwTc6nKmLIVneCM=.LslWAO7jiWhHUgLNlp8Itw==&[connectId=_crypto-connector] at [2600:9000:2104:6200:2:bc38:d480:93a1]:443...
2022-03-12 21:32:23 - DEBUG: Client handshake done.
2022-03-12 21:32:23 - INFO: Connected!
2022-03-12 21:32:23 - INFO: Start event loop...
2022-03-12 21:32:23 - INFO: Welcome message received, Subscribing to bookticker...
2022-03-12 21:32:23 - DEBUG: response '200 OK' for https://api-futures.kucoin.com/api/v1/contracts/active
2022-03-12 21:32:24 - DEBUG: Found 101 available symbols.
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:XBTUSDTM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:XBTUSDM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:ETHUSDTM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:BCHUSDTM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:BSVUSDTM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:LINKUSDTM
2022-03-12 21:32:24 - INFO: [kucoin-futures] Subscribing to topic with symbol: /contractMarket/tickerV2:UNIUSDTM

....

2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638492847965, symbol: "DENTUSDTM", best_bid_price: "0.002339", best_bid_size: 15661, best_ask_price: "0.002341", best_ask_size: 900, ts: 1647120794690064061 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638497113893, symbol: "LINKUSDTM", best_bid_price: "13.321", best_bid_size: 3792, best_ask_price: "13.323", best_ask_size: 5554, ts: 1647120794695992027 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1643122241068, symbol: "ETHUSDTM", best_bid_price: "2590.2", best_bid_size: 8169, best_ask_price: "2590.25", best_ask_size: 4440, ts: 1647120794694203043 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638493063154, symbol: "BCHUSDTM", best_bid_price: "290.7", best_bid_size: 2583, best_ask_price: "290.85", best_ask_size: 4267, ts: 1647120794697201559 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638494728461, symbol: "XRPUSDTM", best_bid_price: "0.7919", best_bid_size: 6413, best_ask_price: "0.792", best_ask_size: 3802, ts: 1647120794701234872 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638492861551, symbol: "TRXUSDTM", best_bid_price: "0.05971", best_bid_size: 201, best_ask_price: "0.05972", best_ask_size: 414, ts: 1647120794700317169 }
2022-03-12 21:33:19 - INFO: [kucoin-futures] SymbolTicker { sequence: 1638493570747, symbol: "ZECUSDTM", best_bid_price: "153.12", best_bid_size: 3052, best_ask_price: "153.16", best_ask_size: 4889, ts: 1647120794706209324 }

Conclusion

We have created a nice foundation for an app that can make use of real-time cryptocurrency futures price data from KuCoin. Over the course of the tutorial, we have learned how to get information from the REST API and how to connect to the KuCoin Futures WebSocket server. Finally, we learned how to subscribe to topics or channels to receive data messages.

The full project can be found on my GitHub: here.

Please follow me on Twitter to get notified on new Rust programming and cryptocurrency related articles:

Leave a Reply

Your email address will not be published. Required fields are marked *