Apache Kafka Connector Configuration
Example of connector configuration in sv-lab.json for unauthenticated broker connection:
"connector": [
{
"id": "connector",
"connectorType": "kafka",
"properties": {
"kafka.bootstrap.servers": "localhost:9092",
"kafka.group.id": "demo"
}
}
],
Example of connector configuration in sv-lab.json for access to broker using SASL_PLAINTEXT authentication:
"connector": [
{
"id": "connector",
"connectorType": "kafka",
"properties": {
"kafka.bootstrap.servers": "localhost:9092",
"kafka.group.id": "demo",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafkaclient1\" password=\"secret\";"
}
}
],
Kafka connector configuration properties
The Kafka connector is configured with the Kafka's producer and consumer configuration properties prepended with the "kafka." prefix.
There are the most common properties listed in following table. The mandatory properties are bold, other properties are optional. When a property is set in connector it affects all endpoints unless it is overriden in endpoint.
Property | Description |
---|---|
kafka.bootstrap.servers | (string) A list of host/port pairs for initial connection to the Kafka cluster, i.e. localhost:9092 |
kafka.group.id | (string) A unique consumer group identifier. |
kafka.client.id | (string) Client identification when making requests, included in Kafka server logs. |
kafka.auto.offset.reset | (string) Specifies what to do when there is no initial offset in kafka, one of "earliest", "latest" or none (see Kafka documentation) |
kafka.security.protocol | (string) Protocol used to communicate with broker, one of "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL". |
kafka.sasl.mechanism | (string) SASL mechanism used by client, one of "GSSAPI", "OAUTHBEARER, "SCRAM", "PLAIN", ... |
kafka.sasl.jaas.config | (string) Login context parameters for SASL connections in the JAAS configuration file format |
For complete list of configuration properties see the Producer Configs and Consumer Configs chapters in Kafka documentation.
Kafka endpoint configuration properties
In each endpoint you can configure either request, response or both. Since Kafka is one-way, not request-response, the endpoint request/response mapping just affects how endpoint behaves in different service modes (see also Spring correlation below).
- FORWARD_TO_REAL_SERVICE mode:
- SV will listen on virtual request topic and send the received messages to real request topic.
- SV will also listen on real response topic and send the received messages to virtual response topic.
- In both cases it will learn the messages.
- SIMULATE_SERVICE mode:
- SV will listen on virtual request topic and pass received messages to simulator.
- Any messages generated by simulator will be sent to virtual response topic..
- INVOKE_REAL_SERVICE mode:
- SV will send messages generated by simulator to real request topic.
- SV will listen for messages from real service on real response topic.
As can be seen, REQUEST topics work from client to real service and RESPONSE topics work in reverse. This way you can simulate service which sends messages to some topic(s) and receives them on another topic(s). There is no correlation between request and response topics or messages (but see Spring correlation below) so you can use separate endpoints with request-only and response-only configuration as well.
Usually, you specify just topic name, which means the SV will subscribe to topic and send messages to topic. However, you may need to specify exact partition to use. In that case, SV will consume messages only from that single partition or send messages to single partition.
Note
When you use specific partition for receiving messages, you must also
specify unique group ID (kafka.group.id
). If you use the same group ID as
any other listener, there will be exceptions thrown from Kafka (Commit
cannot be completed since the group has already rebalanced and assigned the
partitions to another member.) during message commit.
When you are learning the service, you should also specify content types of key,
value and headers of the messages with kafkaKeyContentType
,
kafkaValueContentType
and kafkaHeadersContentTypes
properties. Without this
specification, they all would be learned as binary. During learning, the content
type is written to service interface.
Currently supported content types are application/xml
, application/json
,
text/plain
and application/octet-stream
.
Note
It is important to set name of endpoint as the endpoint is directly bound to operation. There is one to one mapping between endpoint and operation so the endpoint name must match the operation name. During learning, the operation will be named according to endpoint from which the message came. During simulation and invocation, operation name is used to determined where to send the message - SV uses the endpoint named after the operation. The only exception is reply-to topic specified in request as used by Spring correlation below.
Mandatory properties are bold. Other properties are optional.
Property | Default Value | Description |
---|---|---|
displayName | (string) The endpoint name used in operation definition in service interface. | |
kafkaVirtualRequestTopic | (string) The topic for receiving requests from clients. | |
kafkaVirtualRequestPartition | -1 | (integer) The partition for receiving requests from clients. (1) |
kafkaVirtualResponseTopic | (string) The topic where clients receive responses. Optional, the client can provide response topic with replyTo header. |
|
kafkaVirtualResponsePartition | -1 | (integer) The partition where clients receive responses. (1) |
kafkaRealRequestTopic | (string) The topic where service receives requests. Optional, the client can provide a topic with replyTo header for solicit response in invocation mode. |
|
kafkaRealRequestPartition | -1 | (integer) The partition where service receives requests. (1) |
kafkaRealResponseTopic | (string) The topic to receive responses from service. | |
kafkaRealResponsePartition | -1 | (integer) The partition to receive responses from service. (1) |
kafkaKeyContentType | (string) Content type of key. | |
kafkaValueContentType | (string) Content type of value. | |
kafkaHeadersContentTypes | (string) Content types for headers using the content-type: header1, header2, ...; content-type: header3, ... syntax specifying list of headers for each content type. Example: text/plain: SessionId, UserName; application/json: CallerAddress . |
Notes:
- [1] The default value -1 means no partition.
Spring correlation
Spring provides request/response pairing for Kafka using the
ReplyingKafkaTemplate
.
The implementation uses specific message headers to store correlation ID into
message and to specify topic where to send response (reply-to). SV will
automatically detect these properties and learn/simulate the behavior. However,
it requires endpoint to be configured with both request and response topics to
be able to correctly send messages.
How it works:
- FORWARD_TO_REAL_SERVICE mode:
- Request is received by SV on virtual request topic, it's
reply-to
is remembered, message is learned and sent to real request topic withreply-to
set to configured real response topic. - The real service then sends response back to real response topic where SV
receives it, learns it and sends to the remembered
reply-to
from request. - In this case the virtual response topic is not needed.
- Request is received by SV on virtual request topic, it's
- SIMULATE_SERVICE mode:
- In this mode SV receives request on virtual request topic, generates response and sends it to reply-to topic specified in request.
- There is only virtual request topic required in this case.
- INVOKE_REAL_SERVICE mode:
- SV generates a message with correlation ID and
reply-to
set to real response topic. The message is sent to real request topic. - Only the real part of endpoint configuration is needed.
- SV generates a message with correlation ID and
Virtual response topic is not needed in any of the modes since the Spring client
sends the reply-to
information with every request.
Note
The real response topic must have only one consumer, i.e. you cannot run SV
multiple times to use the same real response topic. This is because SV does
not check the exact subscribed partition and thus the response could be
received by different process. If you need multiple consumers and
partitions, specify exact partition in endpoint configuration. SV will set
both topic name and partition in reply-to
.
Note
When service is implemented using VSL instead of learning, the operation
must be marked as supporting Spring correlation. Without this mark, SV
wouldn't generate correlation ID and reply-to
in the INVOKE_REAL_SERVICE
mode and the virtual service would not work in this mode. The marking can be
achieved by using withSpringCorrelation()
in the
operation definition.