Work-queue Stream in JetStream
A work-queue retention policy satisfies a very common use case of queuing up messages that are intended to be processed once and only once.
This retention policy supports queuing up messages from publishers independent of consummption. Since each message is intended to be processed only once, this retention type allows for a set of consumers that have non-overlapping interest on subjects. In other words, if multiple consumers are bound to a work-queue stream, they must have disjoint filter subjects. This is in constrast to a standard limits-based or interest-based stream which supports multiple consumers with overlapping interest.
Like the interest policy this retention policy is additive
to any limits set on the stream. As a contrived example, if max-msgs
is set to one with old messages being discarded, every new message that
is received by the stream will result in the prior message being deleted
regardless if any subscriptions were available to process the message.
In this example, we will walk through the work-queue retention setup and behavior. If you are new to streams, it is recommended to read the limits-based stream example prior to reading this one.
$ nbe run jetstream/workqueue-stream/rustView the source code or learn how to run this example yourself
Code
use async_nats::jetstream::{self, stream};
use futures::{StreamExt, TryStreamExt};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
Use the env variable if running in the container, otherwise use the default.
let nats_url =
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
Create an unauthenticated connection to NATS.
let client = async_nats::connect(nats_url).await?;
Access jetstream::context
to use the JS APIs.
let jetstream = jetstream::new(client);
Creating the stream
Define the stream configuration, specifying WorkQueuePolicy
for
retention, and create the stream.
let mut stream = jetstream
.create_stream(jetstream::stream::Config {
name: "EVENTS".to_string(),
retention: stream::RetentionPolicy::WorkQueue,
subjects: vec!["events.>".to_string()],
..Default::default()
})
.await?;
println!("created the stream");
Queue messages
Publish a few messages.
jetstream
.publish("events.us.page_loaded".into(), "".into())
.await?
.await?;
jetstream
.publish("events.eu.mouse_clicked".into(), "".into())
.await?
.await?;
jetstream
.publish("events.us.input_focused".into(), "".into())
.await?
.await?;
println!("published 3 messages");
Checking the stream info, we see three messages have been queued.
println!(
"Stream info without any consumers: {:#?}",
stream.info().await?
);
Adding a consumer
Now let’s add a consumer and publish a few more messages.
let consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("processor-1".to_string()),
..Default::default()
})
.await?;
Fetch and ack the queued messages.
let mut messages = consumer.fetch().max_messages(3).messages().await?;
while let Some(message) = messages.next().await {
message?.ack().await?;
}
Checking the stream info again, we will notice no messages are available.
println!("Stream info with one consumer: {:#?}", stream.info().await?);
Exclusive non-filtered consumer
As noted in the description above, work-queue streams can only have at most one consumer with interest on a subject at any given time. Since the pull consumer above is not filtered, if we try to create another one, it will fail.
let err = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("processor-2".to_string()),
..Default::default()
})
.await
.expect_err("fail to create overlapping consumer");
println!("Create an overlapping consumer: {}", err);
However if we delete the first one, we can then add the new one.
stream.delete_consumer("processor-1").await?;
let result = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("processor-2".to_string()),
..Default::default()
})
.await;
println!("created the new consumer? {}", result.is_ok());
stream.delete_consumer("processor-2").await?;
Multiple filtered consumers
To create multiple consumers, a subject filter needs to be applied.
For this example, we could scope each consumer to the geo that the
event was published from, in this case us
or eu
.
let us_consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("processor-us".to_string()),
filter_subject: "events.us.>".to_string(),
..Default::default()
})
.await?;
let eu_consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name: Some("processor-eu".to_string()),
filter_subject: "events.eu.>".to_string(),
..Default::default()
})
.await?;
jetstream
.publish("events.eu.mouse_clicked".into(), "".into())
.await?
.await?;
jetstream
.publish("events.us.page_loaded".into(), "".into())
.await?
.await?;
jetstream
.publish("events.us.input_focused".into(), "".into())
.await?
.await?;
jetstream
.publish("events.eu.page_loaded".into(), "".into())
.await?
.await?;
println!("published 4 messages");
let mut us_messages = us_consumer.fetch().max_messages(2).messages().await?;
let mut eu_messages = eu_consumer.fetch().max_messages(2).messages().await?;
while let Some(message) = us_messages.try_next().await? {
println!("us consumer got: {}", message.subject);
message.ack().await?;
}
while let Some(message) = eu_messages.try_next().await? {
println!("eu consumer got: {}", message.subject);
message.ack().await?;
}
Ok(())
}
Output
created the stream published 3 messages Stream info without any consumers: Info { config: Config { name: "EVENTS", max_bytes: -1, max_messages: -1, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: WorkQueue, max_consumers: -1, max_age: 0ns, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 120s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:19:30.006619882 +00:00:00, state: State { messages: 3, bytes: 157, first_sequence: 1, first_timestamp: 2023-10-23 16:19:30.007926215 +00:00:00, last_sequence: 3, last_timestamp: 2023-10-23 16:19:30.008572257 +00:00:00, consumer_count: 0, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NA5CAZUKLZ3ODGKMIPKY2CVNPTNWZWMQYJS7EHYPAYWQWRK3FEVUL2NY", ), replicas: [], }, ), mirror: None, sources: [], } Stream info with one consumer: Info { config: Config { name: "EVENTS", max_bytes: -1, max_messages: -1, max_messages_per_subject: -1, discard: Old, discard_new_per_subject: false, subjects: [ "events.>", ], retention: WorkQueue, max_consumers: -1, max_age: 0ns, max_message_size: -1, storage: File, num_replicas: 1, no_ack: false, duplicate_window: 120s, template_owner: "", sealed: false, description: None, allow_rollup: false, deny_delete: false, deny_purge: false, republish: None, allow_direct: false, mirror_direct: false, mirror: None, sources: None, metadata: {}, subject_transform: None, compression: Some( None, ), consumer_limits: None, first_sequence: None, }, created: 2023-10-23 16:19:30.006619882 +00:00:00, state: State { messages: 3, bytes: 157, first_sequence: 1, first_timestamp: 2023-10-23 16:19:30.007926215 +00:00:00, last_sequence: 3, last_timestamp: 2023-10-23 16:19:30.008572257 +00:00:00, consumer_count: 1, }, cluster: Some( ClusterInfo { name: None, leader: Some( "NA5CAZUKLZ3ODGKMIPKY2CVNPTNWZWMQYJS7EHYPAYWQWRK3FEVUL2NY", ), replicas: [], }, ), mirror: None, sources: [], } Create an overlapping consumer: JetStream error: 400 (code multiple non-filtered consumers not allowed on workqueue stream, error code 10099) created the new consumer? true published 4 messages us consumer got: events.us.page_loaded us consumer got: events.us.input_focused eu consumer got: events.eu.mouse_clicked eu consumer got: events.eu.page_loaded