When I began this project, many months ago, there were no good resources on Tokio. I procrastinated on writing this blog series, but my intention was for it to be a guide on using Tokio and implementing a real project. A reference of sorts. When I began populating this blog in March, there still weren’t any resources. That is no longer the case. I recently discovered that Tokio added a much better tutorial in June.

This project/tutorial series was my way of learning Rust and Tokio, and I’ve gained a lot out of it. However, the mini-redis tutorial that I linked above seems to cover everything that I would. By the end of this section, we’ll have a client-server architecture that is easily extensible to be a compliant MQTT broker, but I won’t continue the tutorial to create a full-blown MQTT server. If people are interested, I can continue the series, but I don’t see the need for it anymore. So let’s continue from where we left off:

Splitting up our code

Currently, we have two files of Rust code and just a few large functions. Let’s split our code making use of an architecture that will make it easy to manage state, which will be stored in our server/broker.

Seperation of Client and State

Let’s create a broker.rs with a Broker struct and give it a method to start the server.

pub struct Broker;

impl Broker {
  pub async fn start_server(addr: IpAddr, port: u16) -> Result<(), Box<dyn Error>> {
    // ...
  }
}

Inside of this method, we’ll want to do what we’re doing right now, spawn a thread for every new connection, but we’ll do it in another Client struct. Eventually, we’ll establish communication between our Broker and Client.

// src/broker.rs; start_server
let address = SocketAddr::new(addr, port);
let mut listener = TcpListener::bind(address).await?;
loop {
  let (stream, addr) = listener.accept().await.unwrap();
  println!("New connection: {}", stream.peer_addr().unwrap());
  tokio::spawn(async move {
      Client::handle_client(stream, addr).await;
  });
}
Ok(())

and in client.rs

pub struct Client;

impl Client {
  async fn handle_messages(
    framed: &mut Framed<TcpStream, MQTTCodec>,
    packet: Packet
  ) {
    match packet {
      Packet::Pingreq => {
        println!("Ping");
        framed.send(Packet::Pingresp).await;
      }
      _ => {
        // now what?
      }
    }
  }

  pub async fn handle_client(
    stream: TcpStream,
    addr: SocketAddr
  ) {
    let mut framed = Framed::new(stream, MQTTCodec::new());

    // do connection handshake
    let connect = match framed.next().await {
      Some(Ok(Packet::Connect(packet))) => {
        framed.send(Packet::Connack( Connack{
          session_present: false,
          code: ConnectReturnCode::Accepted,
        })).await;
        packet
      }
      _ => {
        println!("Did not receive connect packet");
        return;
      }
    };

    loop {
      match framed.next().await {
        Some(Ok(packet)) => {
          match packet {
            Packet::Disconnect => {
              break;
            },
            _ => {
              Client::handle_messages(&mut framed, packet).await;
            }

          }
        },
        _ => {
          continue;
        }
      }
    }
  }
}

A little long, but it’s all boilerplate that we’ve done already. We can remove all of our code in main.rs and instead call our Broker’s start_server function, and you’ll see that nothing has changed. Let’s add some spice now.

Channels

Ideally, we want to store information between every single client. We need someplace to store the subscriptions of every client, and some way for clients to send messages to other clients. Now, the straightforward way would be to have some global mutable struct, probably called BrokerState, that stores the information for every client and sockets for communication. You’ll quickly realize that doing that sucks. It is awful. Every step of the way, you will be hit with safety errors and deadlocks and it’s just going to be painful.

A smarter way is to implement the architecture that we defined earlier. We’ll pass messages between the client and broker, and the broker and client can decide what to do with each message. This way, there is no interaction between clients and the only interaction between client and broker is through the channels. The nice thing about this architecture is that it mimics the MQTT architecture, so we can just reuse the MQTT Packets as our messages.

We want the broker to have one channel that it gets all of its messages from, and a list of channels to talk to the clients with. Tokio, with the sync feature, allows us to use tokio::sync::mpsc::channel to create a “multi-producer, single-consumer” channel. This will let all the clients speak to the Broker, with Broker only needing to listen to one “thing”. It’s a good idea to create our wrapper over the Packet struct to use as a message:

// src/broker.rs
pub enum BrokerMessage {
  NewConnection {
    client: Client,
    tx_client: Sender<Packet>,
  },
  Message {
    packet: Packet,
    id: String,
  },
}

When the client finds a Connect packet, it’ll create a channel for itself and send that to the broker, and then whenever the client receives a message from the socket, it’ll forward that to the broker. What do we do if the broker wants to send something to the socket? How can the client thread listen to both the socket and the broker at the same time? tokio::select!

Implementing the client

So, our new client loop is going to look something like this:

// src/client.rs;
pub async fn handle_client(
  stream: TcpStream,
  addr: SocketAddr,
) {
  // do connection handshake as before
  // ...
  let (tx_client, mut rx_client): (Sender<Packet>, Receiver<Packet>) = mpsc::channel(100); // 100 is just a magic number
  // TODO: Send this to the broker

  loop {
    tokio::select! {
      // we got a message from the socket
      Some(Ok(packet)) = framed.next() => {
        // handle_message will forward the packet to the broker
        match packet {
          Packet::Disconnect => {
            Client::handle_messages(&mut framed, client_key.clone(), packet).await;
            break;
          },
          _ => {
            Client::handle_messages(&mut framed, client_key.clone(), packet).await;
          }
        }
      },
      // we got a message from the Broker
      Some(packet) = rx_client.next() => {
          // send it to the socket
          framed.send(packet).await.unwrap();
      },
      else => break
    }
  }
}

The tokio::select! macro is really elegant, allowing you to match over various futures and run code on them at the same time. Notice that we are not doing framed.next().await, since the macro expects Futures for it to await. If both Futures run and don’t match our pattern, the else block is executed.

While we’re at it, let’s extend our Client struct to hold some more useful information that we could use later if we desired:

pub struct Client {
  pub id: String,
  addr: SocketAddr,
  pub topics: HashSet<String>,
}
/// ...
pub fn new(id: String, addr: SocketAddr) -> Client {
  Client {
    id,
    addr,
    topics: HashSet::new(),
  }
}

// let's create a client as well
// in handle_client, after connection handshake
let client_key = connect.client_id.to_string();
let client = Client::new(client_key.clone(), addr);

Now, we need some way for the broker to communicate with the client. Let’s create a channel that we clone and send to every single client that is created.

Implementing the Client

// src/broker.rs; start_server
//...
let (tx_broker, mut rx_broker): (Sender<BrokerMessage>,
  Receiver<BrokerMessage>) = mpsc::channel(100);
tokio::spawn(async move {
  loop {
    let (stream, addr) = listener.accept().await.unwrap();
      println!("New connection: {}", stream.peer_addr().unwrap());
      let mut tx_broker = tx_broker.clone();
      tokio::spawn(async move {
        Client::handle_client(stream, addr, &mut tx_broker).await;
      });
    }
  }
});

while let Some(message) = rx_broker.next().await {
  // do something with the message
}

Communicating with the broker

Now we just need to add broker communication to our client:

First, let’s add it to the client struct:

pub struct Client {
  pub id: String,
  addr: SocketAddr,
  pub topics: HashSet<String>,
  tx_broker: Sender<BrokerMessage>,
}

// ...
pub fn new(id: String, addr: SocketAddr, tx_broker: Sender<BrokerMessage>) -> Client {
  Client {
    id,
    addr,
    topics: HashSet::new(),
    tx_broker,
  }
}

Let’s modify handle_client to accept it and use it:

// src/client.rs
pub async fn handle_client(
  stream: TcpStream,
  addr: SocketAddr,
  tx_broker: &mut Sender<BrokerMessage>
) {
  // ...
  let client = Client::new(client_key.clone(), addr, tx_broker.clone());
  // send it to the broker
  tx_broker.send(NewConnection { client, tx_client }).await;
  // ... rest of the handle_client loop
}

Finally, let’s implement handle_messages so that it will send all the messages to the broker to handle.

async fn handle_messages(
  framed: &mut Framed<TcpStream, MQTTCodec>,
  tx_broker: &mut Sender<BrokerMessage>,
  client_key: String,
  packet: Packet,
) {
  match packet {
    Packet::Pingreq => {
      println!("Ping");
      framed.send(Packet::Pingresp).await;
    }
    _ => {
      tx_broker.send(Message {
        id: client_key,
        packet,
      })
      .await;
    }
  }
}

Last but not least, let’s add the handling of packets to the broker. We can create “state” that will store information about the clients to make the rest of our tasks easier.

pub async fn start_server(addr: IpAddr, port: u16) -> Result<(), Box<dyn Error>> {
  let address = SocketAddr::new(addr, port);
  let mut listener = TcpListener::bind(address).await?;
  let mut state: HashMap<String, (Client, Sender<Packet>)> = HashMap::new();
  let (tx_broker, mut rx_broker): (Sender<BrokerMessage>,
    Receiver<BrokerMessage>) = mpsc::channel(100);
  tokio::spawn(async move {
    loop {
      let (stream, addr) = listener.accept().await.unwrap();
      println!("New connection: {}", stream.peer_addr().unwrap());
      let mut tx_broker = tx_broker.clone();
      tokio::spawn(async move {
          Client::handle_client(stream, addr, &mut tx_broker).await;
      });
    }
  });
  while let Some(message) = rx_broker.next().await {
    match message {
      BrokerMessage::NewConnection { client, tx_client } => {
        let id = client.id.clone();
        state.insert(id.clone(), (client, tx_client));
        println!("Registered Client: {}", id);
      },
      BrokerMessage::Message { packet, id } => {
        match packet {
          Packet::Disconnect => {
            state.remove(&id);
          },
          _ => {
            // handle other messages
          }
        }
      }
    }
  }
  Ok(())
}

Now, it’s very easy to implement each packet and handle multiple clients at the same time without needing to share state across threads or do any funky business. As I said at the start of this blog post, it’s really easy for us to extend this base and create a compliant MQTT Broker, but I don’t think that you would learn any more about Tokio or how to use it by doing so.

I hope you enjoyed this little stint of a series, and if you’re interested in more, be sure to periodically check back in.