Over the past month or so, I've been working on postage-rs, an async channel library. Postage makes it easier to write message-based applications in async Rust, providing a rich set of channels, and channel-optimized Sink/Stream traits with combinators.
A bit of background
The async ecosystem provides a few options for channels:
- The tokio crate with
mpsc
,broadcast
,watch
, andoneshot
channels. - The futures crate, with
mpsc
andoneshot
channels - The async-std crate with a multi-producer multi-consumer queue channel.
One of the reasons I've become so familiar with async channels has been my work on tab, a terminal multiplexer. Tab is based on tokio and has a message-based architecture. Any action in tab requires sending a torrent of messages over async channels.
Recently, I had reached the limits of some early hacks. I needed a solution to several problems:
- Because Sender and Receiver interfaces vary so much (
mut
,.await
,Result
andOption
), changes to channel types require many edits. I needed Sink and Stream traits, optimized for async channels, so I could write code that is generic over channel implementations. - I needed combinators, so I could merge receivers and map/filter messages.
- I needed a broadcast channel with backpressure. Most broadcast channels skip messages when receivers fall behind, but this can cause rare bugs that occur in production under heavy load.
Postage
Postage provides a simple, consistent interface to send and receive, unified by Sink and Stream traits. It also provides a rich set of channel implementations that are compatible with any executor (and roughly comparable in performance).
These traits are optimized for convenient use in message-based apps, with minimal control flow: tx.send(msg).await?
and while let Some(msg) = rx.recv().await { ... }
:
use postage::sink::Sink;
use postage::stream::Stream;
use postage::mpsc;
async fn example(cap: size) -> anyhow::Result<()>{
let (mut tx, mut rx) = mpsc::channel(cap);
// fail if all receivers have disconnected
tx.send(1).await?;
send_extra(&mut tx).await;
// no more messages!
drop(tx);
// process messages until the channel is empty and closed
while let Some(msg) = rx.recv().await {
// ...
}
Ok(())
}
async fn send_extra(tx: impl Sink<Item = usize> + Unpin) {
// don't fail if all receivers have disconnected
tx.send(2).await.ok();
}
Sink and Stream also provide try_send and try_recv at the trait level! It's zero-cost, as no Wakers are stored during try_send/try_recv.
Combinators
Sink and Stream include combinators such as map, filter, find, merge, chain, etc. And with the logging
feature, they also support .log(log::Level)
, which enables transparent message logging (including the type name, and the debug representation).
use postage::mpsc;
use postage::sink::Sink;
use postage::stream::{Stream, TryRecvError};
async fn run() -> anyhow::Result<()> {
let (mut tx, rx) = mpsc::channel(4);
tx.send(1usize).await?;
tx.send(2usize).await?;
tx.send(3usize).await?;
drop(tx);
let mut rx = rx
.map(|i| i * 2)
.filter(|i| *i >= 4)
.find(|i| *i == 6);
.log(log::Level::Info);
assert_eq!(Ok(6), rx.try_recv());
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
Ok(())
}
Channels
Postage provides a rich set of channels, which are compatible with any async executor:
- mpsc, a bounded multi-producer single-consumer channel.
- broadcast, a bounded and lossless mpmc channel. Receivers can be created using
tx.subscribe()
, and will observe all messages sent after their creation. They can also be cloned, and the two receivers will observe the same series of messages. - watch, a channel designed to convey state. Each receiver is guaranteed to observe the latest value, but not necessarily every intermediate value. The value can be temporarily borrowed.
- oneshot, a transfer channel between a single sender and receiver.
- barrier, a channel that transmits
()
when the sender is dropped.
Currently, integration test coverage is written for tokio
and async-std
.
What's next?
My current focus is on removing the 'beta' tag from postage. While the current functionality has very strong test coverage, I need to run those tests on more hardware architectures and operating systems.
I'm also interested in adding some new channel types:
- A lagging broadcast channel, for easier migration when lagging broadcast channels are desired.
- A 'dispatch' channel (mpmc queue), similar to the async-std channel.
Further reading
If you are interested in trying it out:
postage = "0.3"
And some links: