LrKafkaConsumer.pollMinNoOfRecords
Polls until a minimum number of messages is received from the server, or maxNotFoundMessages polls return nothing.
public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, long timeoutMs)
public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, long timeoutMs, boolean commitPolls)
public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, Duration timeout)
public static <K, V> ConsumerRecords<K, V> pollMinNoOfRecords(int minimumRecordsPolled, int maxNotFoundMessages, Duration timeout, boolean commitPolls)
Arguments
Name | Description |
---|---|
minimumRecordsPolled | Minimum number of records polled |
maxNotfoundMessages | Maximum number of polls that return nothing |
timeoutMs | The timeout limit for the poll call in milliseconds, as an integer |
timeout | The timeout limit for the poll call as a Duration object |
commitPolls | If set to false, consumer records are not committed before they are returned |
Return values
This function returns records available from the broker.
General information
This is a custom helper function that polls until a minimum number of messages is received from the server, or maxNotFoundMessages polls return nothing.
By default, it commits the records before returning them. This behavior can be disabled by adding an extra parameter, commitPolls, to the end of the method.
This function has variants for supplying the poll timeout either in milliseconds as an integer, or as a duration object.
Example
In the following example, after initialization of KafkaConsumer, the LrKafkaConsumer.subscribe function is called to subscribe to topics that match a pattern. Then, polls are executed with a minimum of 8 ConsumerRecords, if possible. If after 10 tries in a row, no new records are received, all of the polled records are returned. In this implementation, the consumer also commits the polled records.
public int init() throws Throwable {
...
LrKafkaConsumer.<Long, String>initConsumer(...);
...
}
public int action() throws Throwable {
LrKafkaConsumer.subscribe(Pattern.compile(“demo[0-1]*”), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//callback when partitions are revoked
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//callback when partitions are assigned
}
});
ConsumerRecords<Long, String> records = LrKafkaConsumer.pollMinNoOfRecords(8, 10, Duration.ofMillis(1000));
lr.output_message("got " + records.count() + " records");
return 0;
}