LrKafkaConsumer.<K, V>initConsumer

Initializes a KafkaConsumer instance for the current Vuser.

public static <K, V> void initConsumer(String filePath)

public static <K, V> void initConsumer(Properties properties)

public static <K, V> void initConsumer(Map<String, Object> configs)

public static <Key, Value> void initConsumer(Map<String, Object> configs, Deserializer<Key> keyDeserializer, Deserializer<Value> valueDeserializer)

public static <Key, Value> void initConsumer(Properties configs, Deserializer<Key> keyDeserializer, Deserializer<Value> valueDeserializer)


filePathPath of .properties file to load
propertiesProperties object that contains the configuration information for the broker
KeySerializerSerializer object for the key type
ValueSerializerSerializer object for the value type
configsMap of consumer configs. Values can be either strings or objects of the appropriate type for the config key, for example: integers for numeric configurations.

Return values

This function does not return any values.

General information

This function supports both formats that are supported by the original KafkaConsumer. For details, see the Apache Kafka documentation. In addition, it supports an option where you can indicate the path of a .properties file to load. Properties and MapEntries that are passed to this function must comply with the format specified by the Kafka documentation.

This method must be called before using any other KafkaConsumer function, to ensure that a KafkaConsumer instance is created. Otherwise, an error will be printed in the output and the script execution will fail. It is recommended to call this function inside the Vuser init method.

Each Vuser can instantiate only one KafkaConsumer at a time. Even if this method is called more than once, the Vuser uses the initial KafkaConsumer instance and an error is displayed in output file, without terminating the current script execution.


In the following example, KafkaConsumer is initialized in the Vuser init method. The properties passed to the function call are created using the class ConsumerConfig and its attributes. They are passed as a Map<String Object>, and the key and value deserializers are passed separately.

Copy code
public int init() throws Throwable {
         Map<String, Object> configs = new HashMap<>();
         Deserializer<Long> keyDeserializer = new LongDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
         //kafka server
         configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
         //consumer group id
         configs.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
         //Max records in one poll
         configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
         //Auto commit after new offset after polling
         configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         //When no offeset available go to "earliest" point
         configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        LrKafkaConsumer.<Long, String>initConsumer(configs, keyDeserializer, valueDeserializer);