In the last section, we went over the MQTT protocol and our proposed architecture. By the end of this tutorial, we’ll have a tiny little broker that will accept clients and play ping-pong, and we’ll also have learned a bit about Tokio and how to use it. So let’s get started. Now, my Rust isn’t the greatest, some of you probably know way more than I do. If you’ve got any criticisms, I encourage you to send them to me.

Up and running with Tokio

Get your favorite editor out and create a new cargo binary. Add Tokio to your cargo.toml:

# cargo.toml
[dependencies.tokio]
version = "0.2.21"
features = ['macros']

Tokio has a ton of features, so we’ll come back to this section of our config whenever we need some more juice out of it. Now let’s make sure that Tokio is working and write ourselves a “Hello, World”.

Hello, World: Async Edition

To run an asynchronous function, we need to execute it with a runtime. All a runtime does is manage the scheduling and priority of tasks. Using Tokio’s #[tokio::main] macro, we can make sure that all of our code is executed on the normal Tokio runtime.

// src/main.rs
#[tokio::main]
async fn main() {
  let world = async {
    println!("World!");
  };
  print!("Hello, ");
  world.await;
}

This tiny little “Hello, World” makes sure that Tokio is running and that we can create and evaluate futures. If at this point you’re not sure what async and await mean, I recommend reading a little bit about asynchronous programming. This example, although in C#, was super helpful for me.

Let’s get back to it. Our main function can’t throw an error yet, but we’re going to be dealing with I/O soon, so let’s change the signature to cover that:

// src/main.rs
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
  ...
  Ok(())
}

Connecting Clients

The MQTT protocol communicates over TCP, by default on port 1883. Let’s set up a system that listens for connections on this port. We’ll need Tokio’s tcp feature to create a TcpListener. Replacing our test:

// src/main.rs
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::net::TcpListener;
... // main
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1883); // localhost:1883
let mut listener = TcpListener::bind(address).await?;

The listener has a function called accept that we can await on. It will return a stream of whatever the client is sending us when it gives us a result. As we discussed last time, whenever we get a new client, we want to make a new task specifically for it. Tokio allows us to execute code seemingly concurrently by using tokio::spawn to create a new task. The spawn function requires Tokio’s rt-core feature.

// src/main.rs
loop {
  let (stream, addr) = listener.accept().await.unwrap();
  println!("New connection: {}", address);
  tokio::spawn(async {
    handle_client().await;
  });
}

Warning

I’m cheating a little bit here, and we’re going to continue cheating. It is very possible in a real-world scenario for the listener to fail when accepting a new client for whatever reason. We’re not making production software, so we don’t really care. An extension that I would suggest to this project is to implement logging and create a graceful failure system. We’re just going to unwrap values and panic out if something bad happens, but that’s a bad practice.

Okay, now we’re rolling. The handle_client function can do anything we want, although the name implies that we should probably handle the client. For now, let’s just print a little sanity check and make sure that this code is running:

// src/main.rs
async fn handle_client() {
  println!("New client thread spawned")
}

Testing

How do we know that our server is actually accepting connections? We could use a bunch of tools to send packets to our server, but why not use an actual MQTT client? For this tutorial, I’ll be using HiveMQ’s CLI, but you can use anything you want. Most of these tools are well documented and easy to use, so I promise you won’t be lost if you decide to use something else. Open up the MQTT CLI by typing mqtt sh. Since we’re building a broker for MQTT version 3.1.1, we will connect to our server using the command con -V3

# Broker
$ cargo run

# MQTT
mqtt> con -V3
Server closed connection without DISCONNECT.

# Broker
New connection: 127.0.0.1:38706
New client thread spawned

Nice. We’ve made contact with the real world. Let’s respond.

Framing our stream

Have you ever been so inspired by a TcpStream that you want it framed and hung? No? Maybe that’s just me. Regardless, we want to think of packets that we get and send in concrete units. In frames. Tokio has a very powerful and useful struct: Framed. This struct will allow us to send and receive packets without really worrying about how we process them. To use it, we’ll need to add tokio-util to our dependencies and enable the codec feature. To work with one of the libraries we’ll use in a moment, we’re going to have to use an older version of tokio-util, but cargo doesn’t mind.

# cargo.toml
[dependencies]
tokio-util = {version = "0.2", features = ["codec"]}

To create a Framed, we need two things: 1. Some sort of Stream that we are processing 2. A Codec, responsible for encoding and decoding our packets. For now, let’s use the most simple one: BytesCodec.

use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::{Framed, BytesCodec};

// in main
tokio::spawn(async move { // note that we are now moving stream into the new scope
  handle_client(stream).await;
});

// handle_client
async fn handle_client(stream: TcpStream) {
  let mut framed = Framed::new(stream, BytesCodec::new());
  loop {
    // hold on to the client
  }
}

If you run this code and try to connect, our MQTT client will hang for about a minute before telling us: Timeout while waiting for CONNACK. We need to accept the client’s connect packet and respond to it. The framed has a function called next, that, as you might expect, returns the next packet decoded by the Codec. To use this next function, we’ll need to add the futures library.

# cargo.toml
[dependencies]
futures = "0.3.5"
// src/main.rs
use futures::StreamExt;
... // handle_client, before the loop
let packet = framed.next().await;
println!("{:#?}", packet);

We’ll see something like: b"\x10\x18\0\x04MQTT\x04\x02\0<\0\x0cmqttZVluCOgW as our incoming packet. Yikes. Let’s send that CONACK packet to the client.

To create a response, we’ll need to add bytes as a dependency. We’ll also need to import the send function, which our TcpStream gets from the futures library.

# cargo.toml
[dependencies]
bytes = "0.5.2"
// src/main.rs
use futures::{StreamExt, SinkExt};
use bytes::Bytes;
... // handle_client, before the loop
let conack = Bytes::from(vec![32u8, 2, 0, 0]); // the CONACK packet as a series of bytes
println!("{:#?}", conack);
framed.send(conack).await;

If we give this a run, it works! For about a minute. Then we get another error… Timeout while waiting for PINGRESP. Note that sending a packet might error out, so if you’re being a responsible programmer, you should handle that.

If you’ve been paying attention, you’re probably a little upset. After all, I promised that we won’t be doing any bit-banging, in fact, I told you that we would not care about the individual bytes of the packet. Let’s do that.

MQTT Codec

We’re going to use a tiny little crate called mqtt-codec. As the name implies, it creates a tokio codec for all the MQTT packet types and saves us a ton of time. Sadly, it is not compliant with the most recent version of tokio-util, which is why we’re using an older version.

Extension

Because of this rather frustrating regression in versions, and the fact that making your own Codec is not super difficult, I recommend you extend this project and implement your own Codec. If you care more about the Tokio side of things, I recommend using the library mqttrs for the bit-banging, but if you want to write the encoding and decoding code yourself, knock yourself out. Part 2.5 walks through doing this, and the rest of the tutorial will assume you’ve done part 2.5.

# config.toml
[dependencies]
mqtt-codec = "0.3.0"

Then, to use the Codec:

// src/main.rs
use mqtt_codec::{Codec, Packet::*, ConnectCode::ConnectionAccepted};
... // handle_client
let mut framed = Framed::new(stream, Codec::new());
let connect = match framed.next().await {
  Some(Ok(Connect(packet))) => {
    framed.send(
      ConnectAck {session_present: false, return_code: ConnectionAccepted }
    );
    packet
  },
  _ => {
    println!("Did not receive connect packet");
    return;
  }
};
println!("{:#?}", connect);

Hooray! We’re no longer dealing in bytes. We can get rid of the bytes library in our cargo.toml and remove any mention of the BytesCodec. Although our library situation isn’t perfect, we can work with it. Let’s finish up: whenever the client sends us a ping, let’s respond.

Ping Pong Champions

The idea is pretty simple, after connecting we’ll await a new packet, and when we get one, we’ll check if it’s a Ping. If it is, we respond.

// src/main.rs
loop {
  match framed.next().await {
    Some(Ok(PingRequest)) => {
      framed.send(PingResponse).await;
      println!("Ping - Pong!")
    },
    _ => {
      println!("Received an unknown packet");
      break;
    }
  }
}

Now our program will keep running forever, at least until you try to use it. But it’s progress; we’ve got something that’s working. The next section will be short, all we’ll do is reorganize our code so that it’s easier to work with. If anyone has suggestions for Codec libraries that support the MQTT protocol, I’d love to test them out.