https://github.com/rollno748/di-kafkameter/wiki#producer-elements
DI-Kafkameter is a JMeter plugin that allows you to test and measure the performance of Apache Kafka.
The DI-Kafkameter comprises of 2 components, which are
To publish/send a message to a Kafka topic you need to add producer components to the testplan.
Right click on Test Plan -> Add -> Config Element -> Kafka Producer Config
Provide a Variable name to export the connection object (Which will be used in Sampler element)
Provide the Kafka connection configs (list of Brokers with comma separated)
Provide a Client ID (Make it unique, to define where you sending the message from)
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
For JAAS Security, You need to add the below key and value to the Additional Properties
Config key: sasl.jaas.config
Config value: org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
Right click on Test Plan -> Add -> Sampler -> Kafka Producer Sampler
Use the same Variable name which was defined in the config element
Define the topic name where you want to send the message (Case sensitive)
Kafka Message - The Original message which needs to be pushed to the topic
Partition String (Optional) - This option helps you to post messages to particular partition by providing the partition number
Message Headers (Optional) - This helps in adding headers to the messages which are being pushed (Supports more than one header)
To Consume/Read a message from a Kafka topic you need to add Consumer components to the testplan.
Right click on Test Plan -> Add -> Config Element -> Kafka Consumer Config
Provide a Variable name to export the connection object (Which will be used in Sampler element)
Provide the Kafka connection configs (list of Brokers with comma separated)
Provide a Group ID (Make it unique, to define the group your consumer belongs to)
Define the topic name where you want to send the message (Case sensitive)
No Of Messages to Poll - This allows you to define the number of messages to read within a request (Defaults to 1)
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
Auto Commit - This will set the offset as read, once the message is consumed
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
For JAAS Security, You need to add the below key and value to the Additional Properties
Config key: sasl.jaas.config
Config value: org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
Right click on Test Plan -> Add -> Sampler -> Kafka Consumer Sampler
Use the same Variable name which was defined in the config element
Poll timeout - This helps to set the polling timeout for consumer to read from topic (Defaults to 100 ms)
Commit Type - Defines the Commit type (Sync/Async)
Supported Producer properties which can be added to Additional Properties field.
Property | Available Options | Default |
---|---|---|
acks | [0, 1, -1] | 1 |
batch.size | positive integer | 16384 |
bootstrap.servers | comma-separated host:port pairs | localhost:9092 |
buffer.memory | positive long | 33554432 |
client.id | string | "" |
compression.type | [none, gzip, snappy, lz4, zstd] | none |
connections.max.idle.ms | positive long | 540000 |
delivery.timeout.ms | positive long | 120000 |
enable.idempotence | [true, false] | false |
interceptor.classes | fully-qualified class names | [] |
key.serializer | fully-qualified class name | org.apache.kafka.common.serialization.StringSerializer |
linger.ms | non-negative integer | 0 |
max.block.ms | non-negative long | 60000 |
max.in.flight.requests.per.connection | positive integer | 5 |
max.request.size | positive integer | 1048576 |
metadata.fetch.timeout.ms | positive long | 60000 |
metadata.max.age.ms | positive long | 300000 |
partitioner.class | fully-qualified class name | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes | positive integer | 32768 |
reconnect.backoff.ms | non-negative long | 50 |
request.timeout.ms | positive integer | 30000 |
retries | non-negative integer | 0 |
sasl.jaas.config | string | null |
sasl.kerberos.kinit.cmd | string | /usr/bin/kinit |
sasl.kerberos.min.time.before.relogin | positive long | 60000 |
sasl.kerberos.service.name | string | null |
sasl.mechanism | [GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] | GSSAPI |
security.protocol | [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] | PLAINTEXT |
sender.flush.timeout.ms | non-negative long | 0 |
send.buffer.bytes | positive integer | 131072 |
value.serializer | fully-qualified class name | org.apache.kafka.common.serialization.StringSerializer |
Supported Consumer properties which can be added to Additional Properties field.
Property | Available Options | Default |
---|---|---|
auto.commit.interval.ms | positive integer | 5000 |
auto.offset.reset | [earliest, latest, none] | latest |
bootstrap.servers | comma-separated host:port pairs | localhost:9092 |
check.crcs | [true, false] | true |
client.id | string | "" |
connections.max.idle.ms | positive long | 540000 |
enable.auto.commit | [true, false] | true |
exclude.internal.topics | [true, false] | true |
fetch.max.bytes | positive long | 52428800 |
fetch.max.wait.ms | non-negative integer | 500 |
fetch.min.bytes | non-negative integer | 1 |
group.id | string | "" |
heartbeat.interval.ms | positive integer | 3000 |
interceptor.classes | fully-qualified class names | [] |
isolation.level | [read_uncommitted, read_committed] | read_uncommitted |
key.deserializer | fully-qualified class name | org.apache.kafka.common.serialization.StringDeserializer |
max.partition.fetch.bytes | positive integer | 1048576 |
max.poll.interval.ms | positive long | 300000 |
max.poll.records | positive integer | 500 |
metadata.max.age.ms | positive long | 300000 |
metadata.fetch.timeout.ms | positive long | 60000 |
receive.buffer.bytes | positive integer | 32768 |
reconnect.backoff.ms | non-negative long | 50 |
request.timeout.ms | positive integer | 30000 |
retry.backoff.ms | non-negative long | 100 |
sasl.jaas.config | string | null |
sasl.kerberos.kinit.cmd | string | /usr/bin/kinit |
sasl.kerberos.min.time.before.relogin | positive long | 60000 |
sasl.kerberos.service.name | string | null |
sasl.mechanism | [GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] | GSSAPI |
security.protocol | [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] | PLAINTEXT |
send.buffer.bytes | positive integer | 131072 |
session.timeout.ms | positive integer | 10000 |
value.deserializer | fully-qualified class name | org.apache.kafka.common.serialization.StringDeserializer |