LrKafkaConsumer.pollFor

Polls topics for a set number of seconds and returns resulting ConsumerRecords.

public static <K, V> ConsumerRecords<K, V> pollFor(int seconds, Duration timeout)

public static <K, V> ConsumerRecords<K, V> pollFor(int seconds, Duration timeout, boolean commitPolls)

public static <K, V> ConsumerRecords<K, V> pollFor(int seconds, long timeoutMs)

public static <K, V> ConsumerRecords<K, V> pollFor(int seconds, long timeoutMs, boolean commitPolls)

Arguments

NameDescription
secondsThe total number of seconds to poll for
timeoutMsThe timeout limit for the poll in milliseconds, as an integer
timeoutThe timeout limit for the poll as a Duration object
commitPollsIf set to false, consumer records are not committed before they are returned

Return values

This function returns records available from the broker.

General information

This is a custom helper function that polls topics for a set number of seconds and returns the resulting ConsumerRecords.

By default, it commits the records before returning them. This behavior can be disabled by adding an extra parameter, commitPolls, to the end of the method.

This function has variants for supplying the poll timeout either in milliseconds as an integer, or as a duration object.

Example

In the following example, after initialization of KafkaConsumer, the LrKafkaConsumer.subscribe function is called to subscribe to one topic called “demo”. Then, polling continues for 15 seconds. After this call, the resulting ConsumerRecords that are obtained can be used. In this example, KafkaConsumer does not automatically commit the records after the poll.

Copy code
public int init() throws Throwable {
         …
         LrKafkaConsumer.<Long, String>initConsumer();
         ...
}
public int action() throws Throwable {
         LrKafkaConsumer.subscribe(Collections.singletonList("demo"));
         //poll for 15 seconds
         ConsumerRecords<Long, String> records = LrKafkaConsumer.pollFor(15, Duration.ofMillis(1000), false);
         lr.output_message("got " + records.count() + " records ");
         return 0;
}