I was a little annoyed at needing to use an older version of tokio-util in our previous post, so in this short little post, we’re going to write our own Codec and transition our code.

Purpose

When I began this project, many months ago, there were no good resources on Tokio. I procrastinated on writing this blog series, but my intention was for it to be a guide on using Tokio and implementing a real project. A reference of sorts. When I began populating this blog in March, there still weren’t any resources. That is no longer the case. I recently discovered that Tokio added a much better tutorial in June.

This project/tutorial series was my way of learning Rust and Tokio, and I’ve gained a lot out of it. However, the mini-redis tutorial that I linked above seems to cover everything that I would. By the end of part 3, we’ll have a client-server architecture that is easily extensible to be a compliant MQTT broker, but I won’t continue the tutorial to create a full-blown MQTT server. If people are interested, I can continue the series, but I don’t see the need for it anymore.

What does a Codec need to do?

Codecs are pretty simple, especially if we use a library to handle the bit-banging. We need to implement a function that takes a BytesMut and converts it into a representation of our MQTT Packet. We also need to go the other way around and create a function that takes an MQTT Packet and converts it to a ByteMut. These are implemented as two traits from tokio_util, respectively called Encoder and Decoder.

To handle the lower level manipulation that we don’t care about, we’re going to use the library mqttrs. It is, in my opinion, a much better API than the previous library we were using, and it comes with built-in encode/decode functions to make our life very easy. If you’re up to the challenge, you can implement your own library and encode/decode functions.

We can get rid of the previous library we were using and bump up our tokio-util version. Since we’ve made a lot of changes, this is what our dependencies look like:

[dependencies]
tokio-util = {version = "0.3.1", features = ["codec"]}
futures = "0.3.4"
mqttrs = "0.3"
bytes = "0.5.2"

[dependencies.tokio]
version = "0.2.21"
features = ['macros', 'tcp', 'rt-core']

Writing a Codec

Let’s create a codec.rs file and make a Codec struct:

pub struct MQTTCodec;

impl MQTTCodec {
  pub fn new() -> MQTTCodec {
    MQTTCodec {}
  }
}

Now, let’s import all the things we need and implement the Encoder type using mqttrs::Packet.

use tokio_util::codec::{Encoder, Decoder};
use mqttrs::{Packet, Error, encode, decode};
use bytes::{ BytesMut };

...

impl Encoder<Packet> for MQTTCodec {
  type Error = Error;

  fn encode(&mut self, packet: Packet, mut buf: &mut BytesMut) -> Result<(), Self::Error> {
    encode(&packet, &mut buf)
  }
}

impl Decoder for MQTTCodec {
  type Item = Packet;
  type Error = Error;
  fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
    decode(src)
  }
}

Really simple. Now all that’s left is converting our old code to use this new codec.

Refactoring

First, let’s grab our new codec and clean up our imports:

// main.rs
mod codec;
use crate::codec::MQTTCodec;

use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::{Framed};

use futures::{SinkExt, StreamExt};

use mqttrs::*;

Be careful with importing everything from mqttrs, there are types that mirror the names of packets in the base namespace, and these can lead to annoying bugs. For example, there is a Packet::Connect but also a Connect type, and we will need both.

Let’s replace our previous codec and work on resolving bugs. People like to joke that Rust programmers practice “Error Driven Development”, but Rust’s strong type system really does make it easy to do things like these refactors and ensure that everything still works.

// handle_client
let mut framed = Framed::new(stream, MQTTCodec::new());

// handling of the connect packet
let connect = match framed.next().await {
  Some(Ok(Packet::Connect(packet))) => {
    Some(Ok(Packet::Connect(packet))) => {
      framed.send(
        Packet::Connack( Connack{
          session_present: false,
          code: ConnectReturnCode::Accepted,
        })
      ).await;
      packet
    },
    // ...
  }
}
// ...
// handling of ping packets
match framed.next().await {
  Some(Ok(Packet::Pingreq)) => {
    framed.send(Packet::Pingresp).await;
    println!("Ping - Pong!")
  },
  // ...
}

And that’s it. This was not a necessary step at all, but I prefer being up to date on versions, and I honestly like the mqttrs API much more than what we had before. You can find the complete refactored main.rs here.

In the next blog post, we’ll dive into Tokio and message passing, creating a really elegant architecture to properly manage clients and shared state.