Announcing Postage, an async channel library for Rust

Announcing Postage, an async channel library for Rust

Over the past month or so, I've been working on postage-rs, an async channel library. Postage makes it easier to write message-based applications in async Rust, providing a rich set of channels, and channel-optimized Sink/Stream traits with combinators.

A bit of background

The async ecosystem provides a few options for channels:

  • The tokio crate with mpsc, broadcast, watch, and oneshot channels.
  • The futures crate, with mpsc and oneshot channels
  • The async-std crate with a multi-producer multi-consumer queue channel.

One of the reasons I've become so familiar with async channels has been my work on tab, a terminal multiplexer. Tab is based on tokio and has a message-based architecture. Any action in tab requires sending a torrent of messages over async channels.

Recently, I had reached the limits of some early hacks. I needed a solution to several problems:

  • Because Sender and Receiver interfaces vary so much (mut, .await, Result and Option), changes to channel types require many edits. I needed Sink and Stream traits, optimized for async channels, so I could write code that is generic over channel implementations.
  • I needed combinators, so I could merge receivers and map/filter messages.
  • I needed a broadcast channel with backpressure. Most broadcast channels skip messages when receivers fall behind, but this can cause rare bugs that occur in production under heavy load.

Postage

Postage provides a simple, consistent interface to send and receive, unified by Sink and Stream traits. It also provides a rich set of channel implementations that are compatible with any executor (and roughly comparable in performance).

These traits are optimized for convenient use in message-based apps, with minimal control flow: tx.send(msg).await? and while let Some(msg) = rx.recv().await { ... }:

use postage::sink::Sink;
use postage::stream::Stream;
use postage::mpsc;

async fn example(cap: size) -> anyhow::Result<()>{
    let (mut tx, mut rx) = mpsc::channel(cap);

    // fail if all receivers have disconnected
    tx.send(1).await?;
    send_extra(&mut tx).await;
    // no more messages!
    drop(tx);

    // process messages until the channel is empty and closed
    while let Some(msg) = rx.recv().await {
        // ...
    }
    Ok(())
}

async fn send_extra(tx: impl Sink<Item = usize> + Unpin) {
    // don't fail if all receivers have disconnected
    tx.send(2).await.ok();
}

Sink and Stream also provide try_send and try_recv at the trait level! It's zero-cost, as no Wakers are stored during try_send/try_recv.

Combinators

Sink and Stream include combinators such as map, filter, find, merge, chain, etc. And with the logging feature, they also support .log(log::Level), which enables transparent message logging (including the type name, and the debug representation).

use postage::mpsc;
use postage::sink::Sink;
use postage::stream::{Stream, TryRecvError};

async fn run() -> anyhow::Result<()> {
    let (mut tx, rx) = mpsc::channel(4);

    tx.send(1usize).await?;
    tx.send(2usize).await?;
    tx.send(3usize).await?;
    drop(tx);

    let mut rx = rx
        .map(|i| i * 2)
        .filter(|i| *i >= 4)
        .find(|i| *i == 6);
        .log(log::Level::Info);

    assert_eq!(Ok(6), rx.try_recv());
    assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
    Ok(())
}

Channels

Postage provides a rich set of channels, which are compatible with any async executor:

  • mpsc, a bounded multi-producer single-consumer channel.
  • broadcast, a bounded and lossless mpmc channel. Receivers can be created using tx.subscribe(), and will observe all messages sent after their creation. They can also be cloned, and the two receivers will observe the same series of messages.
  • watch, a channel designed to convey state. Each receiver is guaranteed to observe the latest value, but not necessarily every intermediate value. The value can be temporarily borrowed.
  • oneshot, a transfer channel between a single sender and receiver.
  • barrier, a channel that transmits () when the sender is dropped.

Currently, integration test coverage is written for tokio and async-std.

What's next?

My current focus is on removing the 'beta' tag from postage. While the current functionality has very strong test coverage, I need to run those tests on more hardware architectures and operating systems.

I'm also interested in adding some new channel types:

  • A lagging broadcast channel, for easier migration when lagging broadcast channels are desired.
  • A 'dispatch' channel (mpmc queue), similar to the async-std channel.

Further reading

If you are interested in trying it out:

postage = "0.3"

And some links: