LrKafkaConsumer.pollMinNoOfRecords

Polls until a minimum number of messages is received from the server, or maxNotFoundMessages polls return nothing.

public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, long timeoutMs)

public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, long timeoutMs, boolean commitPolls)

public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, Duration timeout)

public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, Duration timeout, boolean commitPolls)

Arguments

NameDescription
minimumRecordsPolledMinimum number of records polled
maxNotfoundMessagesMaximum number of polls that return nothing
timeoutMsThe timeout limit for the poll call in milliseconds, as an integer
timeoutThe timeout limit for the poll call as a Duration object
commitPollsIf set to false, consumer records are not committed before they are returned

Return values

This function returns records available from the broker.

General information

This is a custom helper function that polls until a minimum number of messages is received from the server, or maxNotFoundMessages polls return nothing.

By default, it commits the records before returning them. This behavior can be disabled by adding an extra parameter, commitPolls, to the end of the method.

This function has variants for supplying the poll timeout either in milliseconds as an integer, or as a duration object.

Example

In the following example, after initialization of KafkaConsumer, the LrKafkaConsumer.subscribe function is called to subscribe to topics that match a pattern. Then, polls are executed with a minimum of 8 ConsumerRecords, if possible. If after 10 tries in a row, no new records are received, all of the polled records are returned. In this implementation, the consumer also commits the polled records.

Copy code
public int init() throws Throwable {
         ...
         LrKafkaConsumer.<Long, String>initConsumer(...);
         ...
}
public int action() throws Throwable {
         LrKafkaConsumer.subscribe(Pattern.compile(“demo[0-1]*), new ConsumerRebalanceListener() {
         
               @Override
             public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                     //callback when partitions are revoked
             }
 
            @Override
             public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                         //callback when partitions are assigned
             }
        });
        ConsumerRecords<Long, String> records = LrKafkaConsumer.pollMinNoOfRecords(8, 10, Duration.ofMillis(1000));
        lr.output_message("got " + records.count() + " records");
      
        return 0;
}