Request-Reply in Messaging
The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.
Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interfaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.
By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.
This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.
$ nbe run messaging/request-reply/rustView the source code or learn how to run this example yourself
Code
use futures::StreamExt;
use std::{env, str::from_utf8, time::Duration};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
Use the NATS_URL env variable if defined, otherwise fallback to the default.
let nats_url = env::var("NATS_URL")
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = async_nats::connect(nats_url).await?;
In addition to vanilla publish-request, NATS supports request-reply interactions as well. Under the covers, this is just an optimized pair of publish-subscribe operations. The requests is just a subscription that responds to a message sent to it. This kind of subscription is called a service.
let mut requests = client.subscribe("greet.*".into()).await.unwrap();
Spawn a new task, so we can respond to incoming requests. Usually request/response happens across clients and network and in such scenarios, you don’t need a separate task.
tokio::spawn({
let client = client.clone();
async move {
Iterate over requests.
while let Some(request) = requests.next().await {
Check if message we got have a reply
to which we can publish the response.
if let Some(reply) = request.reply {
Publish the response.
let name = &request.subject[6..];
client
.publish(reply, format!("hello, {}", name).into())
.await?;
}
}
Ok::<(), async_nats::Error>(())
}
});
As there is a Subscriber
listening to requests, we can sent those.
We’re leaving the payload empty for these examples.
let response = client.request("greet.sue".into(), "".into()).await?;
println!("got a response: {:?}", from_utf8(&response.payload)?);
let response = client.request("greet.john".into(), "".into()).await?;
println!("got a response: {:?}", from_utf8(&response.payload)?);
If we don’t want to endlessly wait until response is returned, we can wrap
it in tokio::time::timeout
.
let response = tokio::time::timeout(
Duration::from_millis(500),
client.request("greet.bob".into(), "".into()),
)
first ?
is Err
if timeout occurs, second is for actual response Result
.
.await??;
println!("got a response: {:?}", from_utf8(&response.payload)?);
Ok(())
}
Output
got a response: "hello, sue" got a response: "hello, john" got a response: "hello, bob"