Implementing an Asynchronous Queue Manager in Rust

published on 2021-10-04

Consider an assembly line involving two rats, Sam and Eric. Sam is at the beginning of the assembly line, grabbing food items from a bin. Once Sam has an item of food, he is able to queue it on a table for Eric to grab. Eric then takes the food item, moves it to somewhere safe, and eats it. Meanwhile, Sam may return to the line to grab another food item. Sam is only allowed to place an item of food on the table (i.e. queue an item of food) when said table is empty. Sam is only allowed to grab an item of food from the bin if his hands are empty. Eric is only allowed to take an item of food from the table (i.e. dequeue an item of food) if he has finished eating his last food item. Sam and Eric vary in the times they take to do their tasks: food-grabbing and food-eating durations distribute in an undetermined fashion. Consequently, Sam may sometimes wait for Eric to finish his food before queueing a new item of food, and Eric may sometimes wait for Sam to grab more food before eating.

Time: 0000ms
An animation of how the assembly line may proceed. Note that each rat must wait at the table in this example.

Sam and Eric are concurrently performing tasks that require synchronization. Sam must pause his loop of grabbing if he realizes the queue is full, and Eric can only eat an item that Sam has put in the queue. Thus, their tasks are not merely done in parallel, as some synchronization must occur. In this post, I will program a queue manager, which is the name I give to an object that spawns the process we just described.

Declaring traits

Let us start by declaring a simple trait which encodes the blocking calculation of Sam at the food container.

/// Simple trait which encodes some blocking calculation for grabbing values.
pub trait BlockingGrabber {
    type Payload;
    type Marker;
    fn init(&mut self) -> Option<Self::Marker>;
    fn next(&mut self, p: &Self::Payload) -> Option<Self::Marker>;
    fn grab(&mut self, m: &Self::Marker) -> Option<Self::Payload>;
}

The associated type Payload corresponds to the type of objects Sam is moving, while Marker serves as a piece of information that Sam uses to grab the next item. The execution cycle of Sam (the grabber) is effectively that he:

  1. initializes some Marker via BlockingGrabber::init,
  2. uses the Marker to grab a Payload via BlockingGrabber::grab,
  3. uses the Payload to get a new Marker via BlockingGrabber::next,
  4. Repeats to step 2.

This cycle will be halted if Sam ever receives the None variant of the Option<Marker> and Option<Payload> return types of the above methods. It is useful to have BlockingGrabber as a trait, because we have a variety of use cases for our queue manager. For instance, the BlockingGrabber can be implemented to grab items from a database, a sequence of files, a network stream, etc.

We will also implement a trait Syncer that will allow the queue manager to synchronously send queue information out of the program. In the context of this post, the Syncer will write a log to a file:

GRAB 1:1250
QUEUE 1:1250
DEQUEUE 1:1250
EAT 1:2000
GRAB 2:2320
QUEUE 2:2320
DEQUEUE 2:2320
GRAB 3:2920
QUEUE 3:2920
GRAB 4:3430
EAT 2:4390
DEQUEUE 3:4390
QUEUE 4:4390
EAT 3:4900
DEQUEUE 4:4900
EAT 4:6320
GRAB 5:7390
QUEUE 5:7390
DEQUEUE 5:7390
EAT 5:7810

which determines the very animation above. This trait simply has a single method Syncer::sync.

/// Simple trait which encodes the syncing
pub trait Syncer {
    fn sync<S: Into<String>>(&mut self, message: S);
}

Implementing QueueManager

We declare a struct QueueManager which will asynchronously spawn 4-step grabbing process described above. With the help of Rust's popular asynchronous runtime, Tokio[undefined][undefined], implementing our QueueManager is simple. Namely, the tokio::sync::mpsc::channel function will create a channel through which Futures may send data. This channel will block the sender Futures if it is at capacity and block the receiver processes if there is no data in the channel.

Let us look at the fields of this struct for some intuition.

/// This struct implements our queue manager
struct QueueManager<P, G: BlockingGrabber<Payload = P>, S: Syncer> {
    receiver: mpsc::Receiver<P>,
    grabber: G,
    syncer: S,
}

See how it has fields grabber and syncer which it uses to obtain data to queue in the channel and synchronously inform the outside work about this data. Meanwhile, the receiver is a struct which we use to provide an interface which allows a user to dequeue items from the channel. We design our API to behave like so.

let mut manager = QueueManager::new(...)
let opt_payload = manager.recv().await;
if let Some(payload) = opt_payload {
  // do stuff with `payload`, which has type `Payload` associated with the 
  // grabber
}

Setting up the recv method calls for a simple composition of manager.receiver.recv, along with some additional syncer logic. The implementation of the method recv is shown below.

    async fn recv(&mut self) -> Option<P> {
        self.receiver.recv().await.map(|payload| {
            let message = format!("DEQUEUE {:?}", payload);
            self.syncer.sync(&message);
            payload
        })
    }

Now, the nontrivial part: How do we implement the spawning of the grabber cycle? If we understand the structure of Rust's async/await execution (Jon Gjengset's stream Crust of Rust: async/await[undefined] is an excellent resource), it turns out to be quite easy. All we must do is spawn a Future which loops the 4-step cycle, and the .await call on the channel sender will perform all of our desired blocking. Let us show the entirety of our implementation.

impl<G, P, M, S> QueueManager<P, G, S>
where
    P: Debug + Clone + Send + 'static,
    M: Clone + Send + 'static,
    G: BlockingGrabber<Payload = P, Marker = M> + Clone + Send + 'static,
    S: Syncer + Clone + Send + 'static,
{
    pub fn new(grabber: G, syncer: S) -> Self {
        // create a channel for the payload
        let (tx, rx) = mpsc::channel(1);

        // clone the grabber and syncer
        let mut gbr = grabber.clone();
        let mut sncr = syncer.clone();

        // mount the lifecycle of the Sender
        tokio::task::spawn(async move {
            // This is Step 1 of our loop cycle.
            let mut opt_marker = gbr.init();
            loop {
                if let Some(marker) = opt_marker {
                    // This is Step 2 of our loop cycle (requires a marker).
                    let opt_payload = gbr.grab(&marker);
                    match opt_payload {
                        Some(payload) => {
                            // This is Step 3 of our loop cycle (requires a payload).
                            opt_marker = gbr.next(&payload);
                            let pld = payload.clone();
                            let res = tx.send(pld).await; // where future may block
                            if let Err(_) = res {
                                sncr.sync("SEND ERROR");
                                return;
                            }
                            // send a message through the Syncer
                            let message = format!("QUEUE {:?}", payload);
                            sncr.sync(&message);
                        }
                        None => {
                            // This is if Step 3 failed: no payload.
                            return;
                        }
                    }
                } else {
                    // This is if Step 2 failed: no marker.
                    return;
                }
            }
        });

        // return the manager
        Self {
            receiver: rx,
            grabber,
            syncer,
        }
    }

    async fn recv(&mut self) -> Option<P> {
        self.receiver.recv().await.map(|payload| {
            let message = format!("DEQUEUE {:?}", payload);
            self.syncer.sync(&message);
            payload
        })
    }
}

Implementing the traits with logging

The machinery is now in place, but we must still implement the traits. We implement a simple example which simulates a simple queue and exports a log which helps us render the animation above. The service times of Sam and Eric will be hardcoded by the following constants.

const VALS: &'static [i32] = &[1, 2, 3, 4, 5];
const LEN: usize = VALS.len();
const BLOCKS: &'static [u64] = &[125, 107, 60, 50, 300];
const SERVES: &'static [u64] = &[75, 205, 50, 140, 40];

Above, VALS is simply the list of values Sam will grab, BLOCKS is the list of times for how long each item will take Sam to grab, and SERVES are the list of times for how long each item will take Eric to eat. These constants will be used in the implementation of SimpleGrabber, which implements the BlockingGrabber trait.

/// Our simple implementation of the [`BlockingGrabber`] trait.
#[derive(Clone)]
pub struct SimpleGrabber<S: Syncer> {
    syncer: S,
}

impl<S: Syncer> SimpleGrabber<S> {
    fn new(syncer: S) -> Self {
        Self { syncer }
    }
}

impl<S: Syncer> BlockingGrabber for SimpleGrabber<S> {
    type Payload = i32;
    type Marker = usize;

    fn init(&mut self) -> Option<usize> {
        Some(0)
    }

    fn next(&mut self, p: &i32) -> Option<usize> {
        let u = *p as usize;
        if u >= LEN {
            None
        } else {
            Some(u)
        }
    }

    fn grab(&mut self, m: &usize) -> Option<i32> {
        if *m >= LEN {
            None
        } else {
            let val = VALS[*m];
            let message = format!("GRAB {:?}", val);
            std::thread::sleep(Duration::from_millis(BLOCKS[*m] as u64));
            self.syncer.sync(&message);
            Some(val)
        }
    }
}

Notice that, as the name BlockingGrabber suggests, we implemented SimpleGrabber to block the thread during the grabbing, via std::thread::sleep. This is intended to simulate how the grabbing operation would likely come from a library that is implemented outside of the Future executor. Note that we also threaded our Syncer into the SimpleGrabber, as we would like to log the grabbing operations.

The Syncer will create a file handle and a timer, so that the implementation of sync will log the messages to the file at the time they are sent. We wrap the file handle in an atomic reference counter and mutex to ensure safe syncing between threads.

/// Implementation of Syncer which writes messages to files.
#[derive(Clone)]
struct SyncWriter {
    start_time: SystemTime,
    writer: Arc<Mutex<File>>,
}

impl SyncWriter {
    fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
        let writer = File::create::<P>(path.into())?;
        Ok(Self {
            start_time: SystemTime::now(),
            writer: Arc::new(Mutex::new(writer)),
        })
    }
}

impl Syncer for SyncWriter {
    fn sync<S: Into<String>>(&mut self, message: S) {
        let elapsed = self.start_time.elapsed().unwrap().as_millis();
        let message = format!("{}:{:?}\n", message.into(), elapsed * 10);
        self.writer
            .lock()
            .unwrap()
            .write(message.as_bytes())
            .unwrap();
    }
}

Running the simulation

Now that everything is implemented, we run the program.

#[tokio::main]
async fn main() {
    // just to ensure that Sam and Eric handle the same amount of items
    assert_eq!(BLOCKS.len(), LEN);
    assert_eq!(SERVES.len(), LEN);

    // create our syncer (this starts the timer)
    let mut syncer = SyncWriter::new("log.txt").unwrap();

    // create our grabber (needs syncer for logging grabs)
    let grabber = SimpleGrabber::new(syncer.clone());

    // create our manager
    let mut manager = QueueManager::new(grabber, syncer.clone());

    // simulate Eric's dequeueing and eating pauses
    for delay in SERVES {
        if let Some(payload) = manager.recv().await {
            tokio::time::sleep(Duration::from_millis(*delay)).await; // where future blocks
            syncer.sync(format!("EAT {:?}", payload));
        } else {
            syncer.sync("TAKE ERROR");
            return;
        }
    }
}

We again display the file that is created from this program, along with the entirety of the code.

use std::fmt::Debug;
use std::fs::File;
use std::io::{Error, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;

/// Simple trait which encodes some blocking calculation for grabbing values.
pub trait BlockingGrabber {
    type Payload;
    type Marker;
    fn init(&mut self) -> Option<Self::Marker>;
    fn next(&mut self, p: &Self::Payload) -> Option<Self::Marker>;
    fn grab(&mut self, m: &Self::Marker) -> Option<Self::Payload>;
}

/// Simple trait which encodes the syncing
pub trait Syncer {
    fn sync<S: Into<String>>(&mut self, message: S);
}

/// This struct implements our queue manager
struct QueueManager<P, G: BlockingGrabber<Payload = P>, S: Syncer> {
    receiver: mpsc::Receiver<P>,
    grabber: G,
    syncer: S,
}

impl<G, P, M, S> QueueManager<P, G, S>
where
    P: Debug + Clone + Send + 'static,
    M: Clone + Send + 'static,
    G: BlockingGrabber<Payload = P, Marker = M> + Clone + Send + 'static,
    S: Syncer + Clone + Send + 'static,
{
    pub fn new(grabber: G, syncer: S) -> Self {
        // create a channel for the payload
        let (tx, rx) = mpsc::channel(1);

        // clone the grabber and syncer
        let mut gbr = grabber.clone();
        let mut sncr = syncer.clone();

        // mount the lifecycle of the Sender
        tokio::task::spawn(async move {
            // This is Step 1 of our loop cycle.
            let mut opt_marker = gbr.init();
            loop {
                if let Some(marker) = opt_marker {
                    // This is Step 2 of our loop cycle (requires a marker).
                    let opt_payload = gbr.grab(&marker);
                    match opt_payload {
                        Some(payload) => {
                            // This is Step 3 of our loop cycle (requires a payload).
                            opt_marker = gbr.next(&payload);
                            let pld = payload.clone();
                            let res = tx.send(pld).await; // where future may block
                            if let Err(_) = res {
                                sncr.sync("SEND ERROR");
                                return;
                            }
                            // send a message through the Syncer
                            let message = format!("QUEUE {:?}", payload);
                            sncr.sync(&message);
                        }
                        None => {
                            // This is if Step 3 failed: no payload.
                            return;
                        }
                    }
                } else {
                    // This is if Step 2 failed: no marker.
                    return;
                }
            }
        });

        // return the manager
        Self {
            receiver: rx,
            grabber,
            syncer,
        }
    }

    async fn recv(&mut self) -> Option<P> {
        self.receiver.recv().await.map(|payload| {
            let message = format!("DEQUEUE {:?}", payload);
            self.syncer.sync(&message);
            payload
        })
    }
}

const VALS: &'static [i32] = &[1, 2, 3, 4, 5];
const LEN: usize = VALS.len();
const BLOCKS: &'static [u64] = &[125, 107, 60, 50, 300];
const SERVES: &'static [u64] = &[75, 205, 50, 140, 40];

/// Our simple implementation of the [`BlockingGrabber`] trait.
#[derive(Clone)]
pub struct SimpleGrabber<S: Syncer> {
    syncer: S,
}

impl<S: Syncer> SimpleGrabber<S> {
    fn new(syncer: S) -> Self {
        Self { syncer }
    }
}

impl<S: Syncer> BlockingGrabber for SimpleGrabber<S> {
    type Payload = i32;
    type Marker = usize;

    fn init(&mut self) -> Option<usize> {
        Some(0)
    }

    fn next(&mut self, p: &i32) -> Option<usize> {
        let u = *p as usize;
        if u >= LEN {
            None
        } else {
            Some(u)
        }
    }

    fn grab(&mut self, m: &usize) -> Option<i32> {
        if *m >= LEN {
            None
        } else {
            let val = VALS[*m];
            let message = format!("GRAB {:?}", val);
            std::thread::sleep(Duration::from_millis(BLOCKS[*m] as u64));
            self.syncer.sync(&message);
            Some(val)
        }
    }
}

/// Implementation of Syncer which writes messages to files.
#[derive(Clone)]
struct SyncWriter {
    start_time: SystemTime,
    writer: Arc<Mutex<File>>,
}

impl SyncWriter {
    fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
        let writer = File::create::<P>(path.into())?;
        Ok(Self {
            start_time: SystemTime::now(),
            writer: Arc::new(Mutex::new(writer)),
        })
    }
}

impl Syncer for SyncWriter {
    fn sync<S: Into<String>>(&mut self, message: S) {
        let elapsed = self.start_time.elapsed().unwrap().as_millis();
        let message = format!("{}:{:?}\n", message.into(), elapsed * 10);
        self.writer
            .lock()
            .unwrap()
            .write(message.as_bytes())
            .unwrap();
    }
}

#[tokio::main]
async fn main() {
    // just to ensure that Sam and Eric handle the same amount of items
    assert_eq!(BLOCKS.len(), LEN);
    assert_eq!(SERVES.len(), LEN);

    // create our syncer (this starts the timer)
    let mut syncer = SyncWriter::new("log.txt").unwrap();

    // create our grabber (needs syncer for logging grabs)
    let grabber = SimpleGrabber::new(syncer.clone());

    // create our manager
    let mut manager = QueueManager::new(grabber, syncer.clone());

    // simulate Eric's dequeueing and eating pauses
    for delay in SERVES {
        if let Some(payload) = manager.recv().await {
            tokio::time::sleep(Duration::from_millis(*delay)).await; // where future blocks
            syncer.sync(format!("EAT {:?}", payload));
        } else {
            syncer.sync("TAKE ERROR");
            return;
        }
    }
}

GRAB 1:1250
QUEUE 1:1250
DEQUEUE 1:1250
EAT 1:2000
GRAB 2:2320
QUEUE 2:2320
DEQUEUE 2:2320
GRAB 3:2920
QUEUE 3:2920
GRAB 4:3430
EAT 2:4390
DEQUEUE 3:4390
QUEUE 4:4390
EAT 3:4900
DEQUEUE 4:4900
EAT 4:6320
GRAB 5:7390
QUEUE 5:7390
DEQUEUE 5:7390
EAT 5:7810

References

[0]Crust of Rust: async/await.Website. https://www.youtube.com/watch?v=ThjvMReOXYM
[1]Tokio - An asynchronous Rust runtime.Website. https://tokio.rs/
[2]Tokio documentation.Website. https://docs.rs/tokio/1.12.0/tokio/index.html