Consumer - Fetch Messages in JetStream
The new JetStream API provides simplified semantics for JetStream asset
management and message consumption. It removes the complexity of Subscribe()
in favor of more explicit separation of creating consumers and consuming messages.
This example demonstrates how to Fetch messages with the new API.
$ nbe run jetstream/consumer-fetch-messages/javaView the source code or learn how to run this example yourself
Code
package example;
import io.nats.client.*;
import io.nats.client.api.*;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class Main {
  public static void main(String[] args) {
    String natsURL = System.getenv("NATS_URL");
    if (natsURL == null) {
      natsURL = "nats://127.0.0.1:4222";
    }
    try (Connection conn = Nats.connect(natsURL)) {
      System.out.println("\nA. Prepare Example Stream and Consumers");
        The JetStream and JetStreamManagement
The JetStreamManagement context provides the ability
to create and manage streams.
The JetStream context provides the ability
to publish messages.
      JetStream js = conn.jetStream();
      JetStreamManagement jsm = conn.jetStreamManagement();
        Create a stream and populate the stream with a few messages.
      String streamName = "fetch";
      jsm.addStream(StreamConfiguration.builder()
          .name(streamName)
          .storageType(StorageType.Memory)
          .subjects("events.>")
          .build());
        publish some messages to the stream
      js.publish("events.1", "e1m1".getBytes());
      js.publish("events.2", "e2m1".getBytes());
      js.publish("events.1", "e1m2".getBytes());
      js.publish("events.2", "e2m2".getBytes());
        Although you can make consumers on the fly, typically consumers will be created ahead of time.
      ConsumerConfiguration cc = ConsumerConfiguration.builder().name("onlyEvents1").filterSubject("events.1").build();
      jsm.addOrUpdateConsumer(streamName, cc);
      cc = ConsumerConfiguration.builder().name("allEvents").filterSubject("events.*").build();
      jsm.addOrUpdateConsumer(streamName, cc);
        Simplified JetStream API
The simplified API has a StreamContext for accessing existing
streams, creating consumers, and getting a ConsumerContext.
The StreamContext can be created from the Connection similar to
the legacy API.
      System.out.println("\nB. Use Simplification StreamContext");
      StreamContext streamContext = conn.getStreamContext(streamName);
      StreamInfo streamInfo = streamContext.getStreamInfo(StreamInfoOptions.allSubjects());
      System.out.println("   Stream Name: " + streamInfo.getConfiguration().getName());
      System.out.println("   Stream Subjects: " + streamInfo.getStreamState().getSubjects());
      System.out.println("   Stream Message Count: " + streamInfo.getStreamState().getMsgCount());
        Creating a consumer from the stream context
To create an ephemeral consumer, the createOrUpdateConsumer method
can be used with a bare ConsumerConfiguration object.
Getting a consumer from the stream context
If your consumer already exists as a durable, you can create a
ConsumerContext for that consumer from the stream context or directly
from the connection by providing the stream and consumer name.
      System.out.println("\nC. Simplification Consumer Context");
      ConsumerContext consumerContext1 = streamContext.getConsumerContext("onlyEvents1");
      ConsumerInfo consumerInfo1 = consumerContext1.getCachedConsumerInfo();
      System.out.println("   The ConsumerContext for \"" + consumerInfo1.getName() + "\" was loaded from the StreamContext for \"" + consumerInfo1.getStreamName() + "\"");
      System.out.println("   The consumer has " + consumerInfo1.getNumPending() + " messages available.");
      ConsumerContext consumerContext2 = streamContext.getConsumerContext("allEvents");
      ConsumerInfo consumerInfo2 = consumerContext2.getCachedConsumerInfo();
      System.out.println("\n   The ConsumerContext for \"" + consumerInfo2.getName() + "\" was loaded from the StreamContext for \"" + consumerInfo2.getStreamName() + "\"");
      System.out.println("   The consumer has " + consumerInfo2.getNumPending() + " messages available.");
        Retrieving messages on demand with fetch
FetchConsumer
A FetchConsumer is returned when you call the fetch methods on ConsumerContext.
You will use that object to call nextMessage.
Notice there is no stop on the FetchConsumer interface, the fetch stops by itself.
The new version of fetch is very similar to the old iterate, as it does not block
before returning the entire batch.
      System.out.println("\nD. FetchConsumer");
      System.out.println("   The consumer name is \"" + consumerInfo1.getName() + "\".");
      System.out.println("   The consumer has " + consumerInfo1.getNumPending() + " messages available.");
      long start = System.currentTimeMillis();
      long elapsed;
      try (FetchConsumer fetchConsumer = consumerContext1.fetchMessages(2)) {
        elapsed = System.currentTimeMillis() - start;
        System.out.println("   The 'fetch' method call returned in " + elapsed + "ms.");
        fetch will return null once there are no more messages to consume.
        try {
          Message msg = fetchConsumer.nextMessage();
          while (msg != null) {
            String data = new String(msg.getData());
            System.out.println("   Processing " + msg.getSubject() + " '" + data);
            msg.ack();
            msg = fetchConsumer.nextMessage();
          }
        }
        catch (JetStreamStatusCheckedException se) {
          System.out.println("   JetStreamStatusCheckedException: " + se.getMessage());
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      elapsed = System.currentTimeMillis() - start;
      System.out.println("   Fetch complete in " + elapsed + "ms.");
      System.out.println("\n   The consumer name is \"" + consumerInfo2.getName() + "\".");
      System.out.println("   The consumer has " + consumerInfo2.getNumPending() + " messages available.");
      start = System.currentTimeMillis();
      try (FetchConsumer fetchConsumer = consumerContext2.fetchMessages(2)) {
        elapsed = System.currentTimeMillis() - start;
        System.out.println("   The 'fetch' method call returned in " + elapsed + "ms.");
        fetch will return null once there are no more messages to consume.
        try {
          Message msg = fetchConsumer.nextMessage();
          while (msg != null) {
            elapsed = System.currentTimeMillis() - start;
            String data = new String(msg.getData());
            System.out.println("   Processing " + msg.getSubject() + " '" + data);
            msg.ack();
            msg = fetchConsumer.nextMessage();
          }
        }
        catch (JetStreamStatusCheckedException se) {
          System.out.println("   JetStreamStatusCheckedException: " + se.getMessage());
        }
      }
      catch (Exception e) {
        throw new RuntimeException(e);
      }
      elapsed = System.currentTimeMillis() - start;
      System.out.println("   Fetch complete in " + elapsed + "ms.");
    }
    catch (JetStreamApiException | IOException | InterruptedException e) {
        - JetStreamApiException: the stream or consumer did not exist
 - IOException: problem making the connection
 - InterruptedException: thread interruption in the body of the example
 
      System.err.println(e);
    }
  }
}
        Output
A. Prepare Example Stream and Consumers
B. Use Simplification StreamContext
   Stream Name: fetch
   Stream Subjects: [Subject{name='events.1', count=2}, Subject{name='events.2', count=2}]
   Stream Message Count: 4
C. Simplification Consumer Context
   The ConsumerContext for "onlyEvents1" was loaded from the StreamContext for "fetch"
   The consumer has 2 messages available.
   The ConsumerContext for "allEvents" was loaded from the StreamContext for "fetch"
   The consumer has 4 messages available.
D. FetchConsumer
   The consumer name is "onlyEvents1".
   The consumer has 2 messages available.
   The 'fetch' method call returned in 7ms.
   Processing events.1 'e1m1
   Processing events.1 'e1m2
   Fetch complete in 16ms.
   The consumer name is "allEvents".
   The consumer has 4 messages available.
   The 'fetch' method call returned in 2ms.
   Processing events.1 'e1m1
   Processing events.2 'e2m1
   Fetch complete in 5ms.