How to make backend configurable: crypto arbitrage part 4

We are going to learn how to make our Rust backend configurable using a YAML file. This is part 4 in a series where we build a dashboard displaying crypto triangle arbitrage information. Specifically, we are going to make configurable what streams we subscribe to for coin market data and what triangles we calculate profit for.

Other entries in the series:

This article’s completed code base can be found on my GitHub under the 0.3.0 tag.

Prerequisites

Since this article will explain how to add a YAML configuration to the backend developed in the previous articles, it is helpful to read those articles first.

The code that we will build on can be found on my GitHub: here.

I also have an article about reading data from a YAML file: How to read and write YAML in Rust with Serde.

Updating dependencies

To be able to deserialize from a configuration YAML file, we need to include serde_yaml in our dependencies which will help with that. The following shows the updated Cargo.toml file:

[package]
name = "rust-triangle-arbitrage-dashboard-tutorial"
version = "0.3.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version= "1", features = ["full"] }
tokio-stream = "0.1.6"
warp = "0.3"
serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"
futures = { version = "0.3", default-features=false}
uuid = { version = "0.8.2", features = ["serde", "v4"]}
tungstenite = { version="0.14.0", features = ["rustls-tls"]}
url = "2.1.0"
log = "0.4"
log4rs = { version="1", features = ["background_rotation"]}
serde_yaml = "0.8"

Crypto triangle arbitrage configuration structs

There are a couple of things we want to be able to configure. This is how we make our backend configurable:

  • Which data streams the program will subscribe to.
  • How many results to retrieve for a stream.
  • How often, at what interval, the data streams should be received.
  • What triangles should be processed for calculating profits. For example: BTC-ETH-BNB.
  • What coin pairs the triangles should consist of. We could write logic that figures this out but to keep it simple we will supply that information in the configuration file as well.

Let’s create a new file in the src dir called config.rs. In this file we will write our config structs:

use serde::Deserialize;

#[derive(Debug, Deserialize)]
pub struct TriangleConfig {
    pub parts: [String; 3],
    pub pairs: [String; 3],
}

#[derive(Debug, Deserialize)]
pub struct AppConfig {
    pub update_interval: u32,
    pub results_limit: u32,
    pub depth_streams: Vec<String>,
    pub triangles: Vec<TriangleConfig>,
}

We have a struct called TriangleConfig which contains an array representing the parts of the triangle. For example: [btc, eth, bnb]. And an array representing the pairs that make up the triangle. In the case of the previous example: [ethbtc, bnbeth, bnbbtc].

The second struct is representing the main config structure: AppConfig. The fields are explained as follows:

  • update_interval: interval in milliseconds, for receiving data updates.
  • results_limit: how many results to receive.
  • depth_streams: which depth streams to subscribe to. For example: ethbtc, bnbbtc, bnbeth.
  • triangles: A list of TriangleConfig objects as described earlier.

Add a config file

Let’s add a configuration file called config.yaml to the root directory of our project. Here we will configure all the parameters we discussed in the previous section. Of course, you can configure things to your liking, but here is an example:

update_interval: 100
results_limit: 5
depth_streams:
  - ethbtc
  - bnbeth
  - bnbbtc
  - sushibtc
  - sushibnb
  - solbtc
  - solbnb
  - xrpbtc
  - xrpeth
  - xrpbnb
  - adabnb
  - adabtc
  - adaeth
triangles:
  - parts: [btc, eth, bnb]
    pairs: [ethbtc, bnbeth, bnbbtc]
  - parts: [btc, sol, bnb]
    pairs: [solbtc, solbnb, bnbbtc]
  - parts: [btc, xrp, eth]
    pairs: [xrpbtc, xrpeth, ethbtc]
  - parts: [btc, sushi, bnb]
    pairs: [sushibtc, sushibnb, bnbbtc]
  - parts: [btc, ada, bnb]
    pairs: [adabtc, adabnb, bnbbtc]
  - parts: [btc, eth, ada]
    pairs: [ethbtc, adaeth, adabtc]

Update backed code to use config YAML

Now that we have our config struct defined and a config file created, we need to make use of the config. Therefore, we will dynamically build the stream subscription URL, and use the config when processing data.

The main areas we are going to update:

  • The function get_binance_streams_url in main.rs, to dynamically build the URL.
  • The loop in main_worker() in the workers.rs file, where process_triangle_data is called.

Main.rs updates

First, let’s open main.rs and update the code there:

mod config;
mod handlers;
mod models;
mod workers;
mod ws;

Make the config module available.

fn get_binance_streams_url(
    depth_streams: &Vec<String>,
    update_interval: u32,
    results_limit: u32,
) -> Url {
    let mut depth_streams_parts: Vec<String> = vec![];
    for stream in depth_streams {
        depth_streams_parts.push(format!(
            "{}@depth{}@{}ms",
            stream, results_limit, update_interval
        ));
    }

    let depth_streams_joined = depth_streams_parts.join("/");
    let binance_url = format!("{}/stream?streams={}", BINANCE_WS_API, depth_streams_joined);

    Url::parse(&binance_url).unwrap()
}

Almost the entire get_binance_streams_url function is different:

  1. New parameters to configure streams, update interval, and results limit.
  2. The streams query parameters for the URL are built and pushed into a Vec<String> using a for loop.
  3. We then join the Vec<String> together into a single String.
  4. Finally, all the parts are concatenated as before using format!().

In main() we read the config file here at the top:

async fn main() {
    log4rs::init_file("log_config.yaml", Default::default()).unwrap();
    let f = std::fs::File::open("config.yaml").expect("Could not open file.");
    let app_config: config::AppConfig = serde_yaml::from_reader(f).expect("Could not read values.");

We use the AppConfig information here:

    info!("Connecting to binance stream...");
    let binance_url = get_binance_streams_url(
        &app_config.depth_streams,
        app_config.update_interval,
        app_config.results_limit,
    );

    info!("Subscribing to binance: {}", binance_url);
    let (socket, response) = tungstenite::connect(binance_url).expect("Can't connect.");
    info!("Connected to binance stream.");

and here:

    info!("Starting update loop");
    tokio::task::spawn(async move {
        workers::main_worker(clients.clone(), app_config, socket).await;
    });
    info!("Starting server");

Workers.rs updates

We have to update the function signature for main_worker and how it is determined which triangles are processed:

use crate::{
    config::AppConfig,
    models::{self, DepthStreamWrapper},
    Clients,
};
pub async fn main_worker(clients: Clients, config: AppConfig, mut socket: WebSocket<AutoStream>) {
    let mut pairs_data: HashMap<String, DepthStreamWrapper> = HashMap::new();
    loop {
         // tokio::time::sleep(Duration::from_millis(100)).await;

We added a new parameter to the function: config: AppConfig.

NOTE: that the sleep timer is now disabled. The sleep timer actually caused an issue where the backend could get disconnected because it didn’t receive message fast enough. Which in turn caused it to miss “connection check” messages.

Now for processing the incoming data, we replace the single call to process_triangle_data with a for loop. This for loop runs through all the triangles in the AppConfig object:

        let pair_key = parsed.stream.split_once("@").unwrap().0;
        pairs_data.insert(pair_key.to_string(), parsed);

        for triangle_config in config.triangles.iter() {
            process_triangle_data(
                &pairs_data,
                &triangle_config.pairs[0],
                &triangle_config.pairs[1],
                &triangle_config.pairs[2],
                [
                    &triangle_config.parts[0],
                    &triangle_config.parts[1],
                    &triangle_config.parts[2],
                ],
                clients.clone(),
            )
            .await;
        }

We have completed all the changes that are required to make use of the configuration we set up at the start of the article.

Conclusion

We have learned how to make our crypto triangle arbitrage dashboard backend configurable. By implementing a YAML configuration file, and implementing deserialization of that file we can now easily scale up the number of streams we subscribe to and how many different pair combinations we can process.

The updated code base can be found under the 0.3.0 tag on my GitHub: here.

Follow me on twitter if you want to see updates on this series as they come in:

Leave a Reply

Your email address will not be published. Required fields are marked *