LrKafkaConsumer.<K, V>initConsumer
Initializes a KafkaConsumer instance for the current Vuser.
public static <K, V> void initConsumer(String filePath)
public static <K, V> void initConsumer(Properties properties)
public static <K, V> void initConsumer(Map<String, Object> configs)
public static <Key, Value> void initConsumer(Map<String, Object> configs, Deserializer<Key> keyDeserializer, Deserializer<Value> valueDeserializer)
public static <Key, Value> void initConsumer(Properties configs, Deserializer<Key> keyDeserializer, Deserializer<Value> valueDeserializer)
Arguments
Name | Description |
---|---|
filePath | Path of .properties file to load |
properties | Properties object that contains the configuration information for the broker |
KeySerializer | Serializer object for the key type |
ValueSerializer | Serializer object for the value type |
configs | Map of consumer configs. Values can be either strings or objects of the appropriate type for the config key, for example: integers for numeric configurations. |
Return values
This function does not return any values.
General information
This function supports both formats that are supported by the original KafkaConsumer. For details, see the Apache Kafka documentation. In addition, it supports an option where you can indicate the path of a .properties file to load. Properties and MapEntries that are passed to this function must comply with the format specified by the Kafka documentation.
This method must be called before using any other KafkaConsumer function, to ensure that a KafkaConsumer instance is created. Otherwise, an error will be printed in the output and the script execution will fail. It is recommended to call this function inside the Vuser init method. For details, see Manually create and edit a Kafka script in the VuGen help center.
Each Vuser can instantiate only one KafkaConsumer at a time. Even if this method is called more than once, the Vuser uses the initial KafkaConsumer instance and an error is displayed in output file, without terminating the current script execution.
Example
In the following example, KafkaConsumer is initialized in the Vuser init method. The properties passed to the function call are created using the class ConsumerConfig and its attributes. They are passed as a Map<String Object>, and the key and value deserializers are passed separately.
public int init() throws Throwable {
Map<String, Object> configs = new HashMap<>();
Deserializer<Long> keyDeserializer = new LongDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
//kafka server
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
//consumer group id
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
//Max records in one poll
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
//Auto commit after new offset after polling
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//When no offeset available go to "earliest" point
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
LrKafkaConsumer.<Long, String>initConsumer(configs, keyDeserializer, valueDeserializer);
}