Consider an assembly line involving two rats, Sam and Eric. Sam is at the beginning of the assembly line, grabbing food items from a bin. Once Sam has an item of food, he is able to queue it on a table for Eric to grab. Eric then takes the food item, moves it to somewhere safe, and eats it. Meanwhile, Sam may return to the line to grab another food item. Sam is only allowed to place an item of food on the table (i.e. queue an item of food) when said table is empty. Sam is only allowed to grab an item of food from the bin if his hands are empty. Eric is only allowed to take an item of food from the table (i.e. dequeue an item of food) if he has finished eating his last food item. Sam and Eric vary in the times they take to do their tasks: food-grabbing and food-eating durations distribute in an undetermined fashion. Consequently, Sam may sometimes wait for Eric to finish his food before queueing a new item of food, and Eric may sometimes wait for Sam to grab more food before eating.
Sam and Eric are concurrently performing tasks that require synchronization. Sam must pause his loop of grabbing if he realizes the queue is full, and Eric can only eat an item that Sam has put in the queue. Thus, their tasks are not merely done in parallel, as some synchronization must occur. In this post, I will program a queue manager, which is the name I give to an object that spawns the process we just described.
Let us start by declaring a simple trait which encodes the blocking calculation of Sam at the food container.
/// Simple trait which encodes some blocking calculation for grabbing values.
pub trait BlockingGrabber {
type Payload;
type Marker;
fn init(&mut self) -> Option<Self::Marker>;
fn next(&mut self, p: &Self::Payload) -> Option<Self::Marker>;
fn grab(&mut self, m: &Self::Marker) -> Option<Self::Payload>;
}
The associated type Payload
corresponds to the type of objects Sam is moving, while Marker
serves as a piece of information that Sam uses to grab the next item.
The execution cycle of Sam (the grabber) is effectively that he:
Marker
via BlockingGrabber::init
,Marker
to grab a Payload
via BlockingGrabber::grab
,Payload
to get a new Marker
via BlockingGrabber::next
,This cycle will be halted if Sam ever receives the None
variant of the Option<Marker>
and Option<Payload>
return types of the above methods.
It is useful to have BlockingGrabber
as a trait, because we have a variety of use cases for our queue manager.
For instance, the BlockingGrabber
can be implemented to grab items from a database, a sequence of files, a network stream, etc.
We will also implement a trait Syncer
that will allow the queue manager to synchronously send queue information out of the program.
In the context of this post, the Syncer
will write a log to a file:
GRAB 1:1250
QUEUE 1:1250
DEQUEUE 1:1250
EAT 1:2000
GRAB 2:2320
QUEUE 2:2320
DEQUEUE 2:2320
GRAB 3:2920
QUEUE 3:2920
GRAB 4:3430
EAT 2:4390
DEQUEUE 3:4390
QUEUE 4:4390
EAT 3:4900
DEQUEUE 4:4900
EAT 4:6320
GRAB 5:7390
QUEUE 5:7390
DEQUEUE 5:7390
EAT 5:7810
which determines the very animation above.
This trait simply has a single method Syncer::sync
.
/// Simple trait which encodes the syncing
pub trait Syncer {
fn sync<S: Into<String>>(&mut self, message: S);
}
We declare a struct QueueManager
which will asynchronously spawn 4-step grabbing process described above.
With the help of Rust's popular asynchronous runtime, Tokio[undefined][undefined], implementing our QueueManager
is simple.
Namely, the tokio::sync::mpsc::channel
function will create a channel through which Futures may send data.
This channel will block the sender Futures if it is at capacity and block the receiver processes if there is no data in the channel.
Let us look at the fields of this struct for some intuition.
/// This struct implements our queue manager
struct QueueManager<P, G: BlockingGrabber<Payload = P>, S: Syncer> {
receiver: mpsc::Receiver<P>,
grabber: G,
syncer: S,
}
See how it has fields grabber
and syncer
which it uses to obtain data to queue in the channel and synchronously inform the outside work about this data.
Meanwhile, the receiver
is a struct which we use to provide an interface which allows a user to dequeue items from the channel.
We design our API to behave like so.
let mut manager = QueueManager::new(...)
let opt_payload = manager.recv().await;
if let Some(payload) = opt_payload {
// do stuff with `payload`, which has type `Payload` associated with the
// grabber
}
Setting up the recv
method calls for a simple composition of manager.receiver.recv
, along with some additional syncer
logic.
The implementation of the method recv
is shown below.
async fn recv(&mut self) -> Option<P> {
self.receiver.recv().await.map(|payload| {
let message = format!("DEQUEUE {:?}", payload);
self.syncer.sync(&message);
payload
})
}
Now, the nontrivial part: How do we implement the spawning of the grabber cycle?
If we understand the structure of Rust's async/await
execution (Jon Gjengset's stream Crust of Rust: async/await[undefined] is an excellent resource), it turns out to be quite easy.
All we must do is spawn a Future which loops the 4-step cycle, and the .await
call on the channel sender will perform all of our desired blocking.
Let us show the entirety of our implementation.
impl<G, P, M, S> QueueManager<P, G, S>
where
P: Debug + Clone + Send + 'static,
M: Clone + Send + 'static,
G: BlockingGrabber<Payload = P, Marker = M> + Clone + Send + 'static,
S: Syncer + Clone + Send + 'static,
{
pub fn new(grabber: G, syncer: S) -> Self {
// create a channel for the payload
let (tx, rx) = mpsc::channel(1);
// clone the grabber and syncer
let mut gbr = grabber.clone();
let mut sncr = syncer.clone();
// mount the lifecycle of the Sender
tokio::task::spawn(async move {
// This is Step 1 of our loop cycle.
let mut opt_marker = gbr.init();
loop {
if let Some(marker) = opt_marker {
// This is Step 2 of our loop cycle (requires a marker).
let opt_payload = gbr.grab(&marker);
match opt_payload {
Some(payload) => {
// This is Step 3 of our loop cycle (requires a payload).
opt_marker = gbr.next(&payload);
let pld = payload.clone();
let res = tx.send(pld).await; // where future may block
if let Err(_) = res {
sncr.sync("SEND ERROR");
return;
}
// send a message through the Syncer
let message = format!("QUEUE {:?}", payload);
sncr.sync(&message);
}
None => {
// This is if Step 3 failed: no payload.
return;
}
}
} else {
// This is if Step 2 failed: no marker.
return;
}
}
});
// return the manager
Self {
receiver: rx,
grabber,
syncer,
}
}
async fn recv(&mut self) -> Option<P> {
self.receiver.recv().await.map(|payload| {
let message = format!("DEQUEUE {:?}", payload);
self.syncer.sync(&message);
payload
})
}
}
The machinery is now in place, but we must still implement the traits. We implement a simple example which simulates a simple queue and exports a log which helps us render the animation above. The service times of Sam and Eric will be hardcoded by the following constants.
const VALS: &'static [i32] = &[1, 2, 3, 4, 5];
const LEN: usize = VALS.len();
const BLOCKS: &'static [u64] = &[125, 107, 60, 50, 300];
const SERVES: &'static [u64] = &[75, 205, 50, 140, 40];
Above, VALS
is simply the list of values Sam will grab, BLOCKS
is the list of times for how long each item will take Sam to grab, and SERVES
are the list of times for how long each item will take Eric to eat.
These constants will be used in the implementation of SimpleGrabber
, which implements the BlockingGrabber
trait.
/// Our simple implementation of the [`BlockingGrabber`] trait.
#[derive(Clone)]
pub struct SimpleGrabber<S: Syncer> {
syncer: S,
}
impl<S: Syncer> SimpleGrabber<S> {
fn new(syncer: S) -> Self {
Self { syncer }
}
}
impl<S: Syncer> BlockingGrabber for SimpleGrabber<S> {
type Payload = i32;
type Marker = usize;
fn init(&mut self) -> Option<usize> {
Some(0)
}
fn next(&mut self, p: &i32) -> Option<usize> {
let u = *p as usize;
if u >= LEN {
None
} else {
Some(u)
}
}
fn grab(&mut self, m: &usize) -> Option<i32> {
if *m >= LEN {
None
} else {
let val = VALS[*m];
let message = format!("GRAB {:?}", val);
std::thread::sleep(Duration::from_millis(BLOCKS[*m] as u64));
self.syncer.sync(&message);
Some(val)
}
}
}
Notice that, as the name BlockingGrabber
suggests, we implemented SimpleGrabber
to block the thread during the grabbing, via std::thread::sleep
.
This is intended to simulate how the grabbing operation would likely come from a library that is implemented outside of the Future executor.
Note that we also threaded our Syncer
into the SimpleGrabber
, as we would like to log the grabbing operations.
The Syncer
will create a file handle and a timer, so that the implementation of sync
will log the messages to the file at the time they are sent.
We wrap the file handle in an atomic reference counter and mutex to ensure safe syncing between threads.
/// Implementation of Syncer which writes messages to files.
#[derive(Clone)]
struct SyncWriter {
start_time: SystemTime,
writer: Arc<Mutex<File>>,
}
impl SyncWriter {
fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let writer = File::create::<P>(path.into())?;
Ok(Self {
start_time: SystemTime::now(),
writer: Arc::new(Mutex::new(writer)),
})
}
}
impl Syncer for SyncWriter {
fn sync<S: Into<String>>(&mut self, message: S) {
let elapsed = self.start_time.elapsed().unwrap().as_millis();
let message = format!("{}:{:?}\n", message.into(), elapsed * 10);
self.writer
.lock()
.unwrap()
.write(message.as_bytes())
.unwrap();
}
}
Now that everything is implemented, we run the program.
#[tokio::main]
async fn main() {
// just to ensure that Sam and Eric handle the same amount of items
assert_eq!(BLOCKS.len(), LEN);
assert_eq!(SERVES.len(), LEN);
// create our syncer (this starts the timer)
let mut syncer = SyncWriter::new("log.txt").unwrap();
// create our grabber (needs syncer for logging grabs)
let grabber = SimpleGrabber::new(syncer.clone());
// create our manager
let mut manager = QueueManager::new(grabber, syncer.clone());
// simulate Eric's dequeueing and eating pauses
for delay in SERVES {
if let Some(payload) = manager.recv().await {
tokio::time::sleep(Duration::from_millis(*delay)).await; // where future blocks
syncer.sync(format!("EAT {:?}", payload));
} else {
syncer.sync("TAKE ERROR");
return;
}
}
}
We again display the file that is created from this program, along with the entirety of the code.
use std::fmt::Debug;
use std::fs::File;
use std::io::{Error, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
/// Simple trait which encodes some blocking calculation for grabbing values.
pub trait BlockingGrabber {
type Payload;
type Marker;
fn init(&mut self) -> Option<Self::Marker>;
fn next(&mut self, p: &Self::Payload) -> Option<Self::Marker>;
fn grab(&mut self, m: &Self::Marker) -> Option<Self::Payload>;
}
/// Simple trait which encodes the syncing
pub trait Syncer {
fn sync<S: Into<String>>(&mut self, message: S);
}
/// This struct implements our queue manager
struct QueueManager<P, G: BlockingGrabber<Payload = P>, S: Syncer> {
receiver: mpsc::Receiver<P>,
grabber: G,
syncer: S,
}
impl<G, P, M, S> QueueManager<P, G, S>
where
P: Debug + Clone + Send + 'static,
M: Clone + Send + 'static,
G: BlockingGrabber<Payload = P, Marker = M> + Clone + Send + 'static,
S: Syncer + Clone + Send + 'static,
{
pub fn new(grabber: G, syncer: S) -> Self {
// create a channel for the payload
let (tx, rx) = mpsc::channel(1);
// clone the grabber and syncer
let mut gbr = grabber.clone();
let mut sncr = syncer.clone();
// mount the lifecycle of the Sender
tokio::task::spawn(async move {
// This is Step 1 of our loop cycle.
let mut opt_marker = gbr.init();
loop {
if let Some(marker) = opt_marker {
// This is Step 2 of our loop cycle (requires a marker).
let opt_payload = gbr.grab(&marker);
match opt_payload {
Some(payload) => {
// This is Step 3 of our loop cycle (requires a payload).
opt_marker = gbr.next(&payload);
let pld = payload.clone();
let res = tx.send(pld).await; // where future may block
if let Err(_) = res {
sncr.sync("SEND ERROR");
return;
}
// send a message through the Syncer
let message = format!("QUEUE {:?}", payload);
sncr.sync(&message);
}
None => {
// This is if Step 3 failed: no payload.
return;
}
}
} else {
// This is if Step 2 failed: no marker.
return;
}
}
});
// return the manager
Self {
receiver: rx,
grabber,
syncer,
}
}
async fn recv(&mut self) -> Option<P> {
self.receiver.recv().await.map(|payload| {
let message = format!("DEQUEUE {:?}", payload);
self.syncer.sync(&message);
payload
})
}
}
const VALS: &'static [i32] = &[1, 2, 3, 4, 5];
const LEN: usize = VALS.len();
const BLOCKS: &'static [u64] = &[125, 107, 60, 50, 300];
const SERVES: &'static [u64] = &[75, 205, 50, 140, 40];
/// Our simple implementation of the [`BlockingGrabber`] trait.
#[derive(Clone)]
pub struct SimpleGrabber<S: Syncer> {
syncer: S,
}
impl<S: Syncer> SimpleGrabber<S> {
fn new(syncer: S) -> Self {
Self { syncer }
}
}
impl<S: Syncer> BlockingGrabber for SimpleGrabber<S> {
type Payload = i32;
type Marker = usize;
fn init(&mut self) -> Option<usize> {
Some(0)
}
fn next(&mut self, p: &i32) -> Option<usize> {
let u = *p as usize;
if u >= LEN {
None
} else {
Some(u)
}
}
fn grab(&mut self, m: &usize) -> Option<i32> {
if *m >= LEN {
None
} else {
let val = VALS[*m];
let message = format!("GRAB {:?}", val);
std::thread::sleep(Duration::from_millis(BLOCKS[*m] as u64));
self.syncer.sync(&message);
Some(val)
}
}
}
/// Implementation of Syncer which writes messages to files.
#[derive(Clone)]
struct SyncWriter {
start_time: SystemTime,
writer: Arc<Mutex<File>>,
}
impl SyncWriter {
fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let writer = File::create::<P>(path.into())?;
Ok(Self {
start_time: SystemTime::now(),
writer: Arc::new(Mutex::new(writer)),
})
}
}
impl Syncer for SyncWriter {
fn sync<S: Into<String>>(&mut self, message: S) {
let elapsed = self.start_time.elapsed().unwrap().as_millis();
let message = format!("{}:{:?}\n", message.into(), elapsed * 10);
self.writer
.lock()
.unwrap()
.write(message.as_bytes())
.unwrap();
}
}
#[tokio::main]
async fn main() {
// just to ensure that Sam and Eric handle the same amount of items
assert_eq!(BLOCKS.len(), LEN);
assert_eq!(SERVES.len(), LEN);
// create our syncer (this starts the timer)
let mut syncer = SyncWriter::new("log.txt").unwrap();
// create our grabber (needs syncer for logging grabs)
let grabber = SimpleGrabber::new(syncer.clone());
// create our manager
let mut manager = QueueManager::new(grabber, syncer.clone());
// simulate Eric's dequeueing and eating pauses
for delay in SERVES {
if let Some(payload) = manager.recv().await {
tokio::time::sleep(Duration::from_millis(*delay)).await; // where future blocks
syncer.sync(format!("EAT {:?}", payload));
} else {
syncer.sync("TAKE ERROR");
return;
}
}
}
GRAB 1:1250
QUEUE 1:1250
DEQUEUE 1:1250
EAT 1:2000
GRAB 2:2320
QUEUE 2:2320
DEQUEUE 2:2320
GRAB 3:2920
QUEUE 3:2920
GRAB 4:3430
EAT 2:4390
DEQUEUE 3:4390
QUEUE 4:4390
EAT 3:4900
DEQUEUE 4:4900
EAT 4:6320
GRAB 5:7390
QUEUE 5:7390
DEQUEUE 5:7390
EAT 5:7810