Skip to content

Apache Kafka Connector Configuration

Example of connector configuration in sv-lab.json for unauthenticated broker connection:

"connector": [
  {
    "id": "connector",
    "connectorType": "kafka",
    "properties": {
      "kafka.bootstrap.servers": "localhost:9092",
      "kafka.group.id": "demo"
    }
  }
],

Example of connector configuration in sv-lab.json for access to broker using SASL_PLAINTEXT authentication:

"connector": [
  {
    "id": "connector",
    "connectorType": "kafka",
    "properties": {
      "kafka.bootstrap.servers": "localhost:9092",
      "kafka.group.id": "demo",
      "kafka.sasl.mechanism": "PLAIN",
      "kafka.security.protocol": "SASL_PLAINTEXT",
      "kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafkaclient1\" password=\"secret\";"
    }
  }
],

Kafka connector configuration properties

The Kafka connector is configured with the Kafka's producer and consumer configuration properties prepended with the "kafka." prefix.

There are the most common properties listed in following table. The mandatory properties are bold, other properties are optional. When a property is set in connector it affects all endpoints unless it is overriden in endpoint.

Property Description
kafka.bootstrap.servers (string) A list of host/port pairs for initial connection to the Kafka cluster, i.e. localhost:9092
kafka.group.id (string) A unique consumer group identifier.
kafka.client.id (string) Client identification when making requests, included in Kafka server logs.
kafka.auto.offset.reset (string) Specifies what to do when there is no initial offset in kafka, one of "earliest", "latest" or none (see Kafka documentation)
kafka.security.protocol (string) Protocol used to communicate with broker, one of "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL".
kafka.sasl.mechanism (string) SASL mechanism used by client, one of "GSSAPI", "OAUTHBEARER, "SCRAM", "PLAIN", ...
kafka.sasl.jaas.config (string) Login context parameters for SASL connections in the JAAS configuration file format

For complete list of configuration properties see the Producer Configs and Consumer Configs chapters in Kafka documentation.

Kafka endpoint configuration properties

In each endpoint you can configure either request, response or both. Since Kafka is one-way, not request-response, the endpoint request/response mapping just affects how endpoint behaves in different service modes (see also Spring correlation below).

  • FORWARD_TO_REAL_SERVICE mode:
    • SV will listen on virtual request topic and send the received messages to real request topic.
    • SV will also listen on real response topic and send the received messages to virtual response topic.
    • In both cases it will learn the messages.
  • SIMULATE_SERVICE mode:
    • SV will listen on virtual request topic and pass received messages to simulator.
    • Any messages generated by simulator will be sent to virtual response topic..
  • INVOKE_REAL_SERVICE mode:
    • SV will send messages generated by simulator to real request topic.
    • SV will listen for messages from real service on real response topic.

As can be seen, REQUEST topics work from client to real service and RESPONSE topics work in reverse. This way you can simulate service which sends messages to some topic(s) and receives them on another topic(s). There is no correlation between request and response topics or messages (but see Spring correlation below) so you can use separate endpoints with request-only and response-only configuration as well.

Usually, you specify just topic name, which means the SV will subscribe to topic and send messages to topic. However, you may need to specify exact partition to use. In that case, SV will consume messages only from that single partition or send messages to single partition.

Note

When you use specific partition for receiving messages, you must also specify unique group ID (kafka.group.id). If you use the same group ID as any other listener, there will be exceptions thrown from Kafka (Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.) during message commit.

When you are learning the service, you should also specify content types of key, value and headers of the messages with kafkaKeyContentType, kafkaValueContentType and kafkaHeadersContentTypes properties. Without this specification, they all would be learned as binary. During learning, the content type is written to service interface.

Currently supported content types are application/xml, application/json, text/plain and application/octet-stream.

Note

It is important to set name of endpoint as the endpoint is directly bound to operation. There is one to one mapping between endpoint and operation so the endpoint name must match the operation name. During learning, the operation will be named according to endpoint from which the message came. During simulation and invocation, operation name is used to determined where to send the message - SV uses the endpoint named after the operation. The only exception is reply-to topic specified in request as used by Spring correlation below.

Mandatory properties are bold. Other properties are optional.

Property Default Value Description
displayName (string) The endpoint name used in operation definition in service interface.
kafkaVirtualRequestTopic (string) The topic for receiving requests from clients.
kafkaVirtualRequestPartition -1 (integer) The partition for receiving requests from clients. (1)
kafkaVirtualResponseTopic (string) The topic where clients receive responses. Optional, the client can provide response topic with replyTo header.
kafkaVirtualResponsePartition -1 (integer) The partition where clients receive responses. (1)
kafkaRealRequestTopic (string) The topic where service receives requests. Optional, the client can provide a topic with replyTo header for solicit response in invocation mode.
kafkaRealRequestPartition -1 (integer) The partition where service receives requests. (1)
kafkaRealResponseTopic (string) The topic to receive responses from service.
kafkaRealResponsePartition -1 (integer) The partition to receive responses from service. (1)
kafkaKeyContentType (string) Content type of key.
kafkaValueContentType (string) Content type of value.
kafkaHeadersContentTypes (string) Content types for headers using the content-type: header1, header2, ...; content-type: header3, ... syntax specifying list of headers for each content type. Example: text/plain: SessionId, UserName; application/json: CallerAddress.

Notes:

  • [1] The default value -1 means no partition.

Spring correlation

Spring provides request/response pairing for Kafka using the ReplyingKafkaTemplate. The implementation uses specific message headers to store correlation ID into message and to specify topic where to send response (reply-to). SV will automatically detect these properties and learn/simulate the behavior. However, it requires endpoint to be configured with both request and response topics to be able to correctly send messages.

How it works:

  • FORWARD_TO_REAL_SERVICE mode:
    • Request is received by SV on virtual request topic, it's reply-to is remembered, message is learned and sent to real request topic with reply-to set to configured real response topic.
    • The real service then sends response back to real response topic where SV receives it, learns it and sends to the remembered reply-to from request.
    • In this case the virtual response topic is not needed.
  • SIMULATE_SERVICE mode:
    • In this mode SV receives request on virtual request topic, generates response and sends it to reply-to topic specified in request.
    • There is only virtual request topic required in this case.
  • INVOKE_REAL_SERVICE mode:
    • SV generates a message with correlation ID and reply-to set to real response topic. The message is sent to real request topic.
    • Only the real part of endpoint configuration is needed.

Virtual response topic is not needed in any of the modes since the Spring client sends the reply-to information with every request.

Note

The real response topic must have only one consumer, i.e. you cannot run SV multiple times to use the same real response topic. This is because SV does not check the exact subscribed partition and thus the response could be received by different process. If you need multiple consumers and partitions, specify exact partition in endpoint configuration. SV will set both topic name and partition in reply-to.

Note

When service is implemented using VSL instead of learning, the operation must be marked as supporting Spring correlation. Without this mark, SV wouldn't generate correlation ID and reply-to in the INVOKE_REAL_SERVICE mode and the virtual service would not work in this mode. The marking can be achieved by using withSpringCorrelation() in the operation definition.