设置虚拟机不同的带宽来进行模拟压测
---------kafka数据压测--------------
-----1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。
1.1 版本:1.1.0
-----2、压测kafka。
2.1 使用kafka自带压测工具:bin/kafka-producer-perf-test.sh
命令参数解释:
--num-records :总共发送多少条消息。
--record-size : 每条消息是多少字节。单位字节
--throughput :每秒发送多少条消息,设置-1:表示不限流,可以测试出生产者最大吞吐量
--producer-props:bootstrap.servers=s201:9092,s202:9092,s203:9092
--print-metrics: 输出所有指标
消费压测命令:bin/kafka-consumer-perf-test.sh
--broker-list broker节点列表
--fetch-size 每次fetch的数据的大小
--messages 共消费的消息数
--threads 处理线程数,默认10个
消费:kafka-console-consumer.sh --topic test_producer3 --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --from-beginning
查看消费者组信息:kafka-consumer-groups.sh --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --describe
2.2 : 模拟场景:
2.2.1 使用4M带宽模拟(理想:512KB/s)
2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 32080.000 (发送失败:32080)
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 67920.000 (发送成功:67920)
3310 records sent, 660.2 records/sec (0.64 MB/sec), 30282.4 ms avg latency, 31107.0 max latency.
100000 records sent, 766.442099 records/sec (0.75 MB/sec), 17785.39 ms avg latency, 32738.00 ms max latency, 29960 ms 50th, 30679 ms 95th, 31056 ms 99th, 32285 ms 99.9th.
一共写入10w条消息,其中有发送超时的,失败32080,吞吐量最大为0.75MB/s,每次写入的平均延迟为17785.39毫秒,最大的延迟时间32738毫秒。
结论:扛不住每秒1000条消息的写入,在带宽为4M的前提下,最大每秒也就700多条。
消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000
结果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始时间 结束时间 总消费 平均每秒消费MB 总消费条数 每秒消费条数 负载消耗时间 拉取消耗时间 拉取数据大小 拉取数据条数
2021-12-30 23:00:11, 2021-12-30 23:02:50, 66.3281, 0.4187, 67920, 428.7446, 10, 158406, 0.4187, 428.771
2.2.1.2 topic分区:3,副本数:2,共发送10w条,每秒发送500条
命令:bin/kafka-producer-perf-test.sh --topic test_producer3_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果: producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3_500} : 100000.000
100000 records sent, 459.041979 records/sec (0.45 MB/sec), 5634.44 ms avg latency, 18407.00 ms max latency, 4029 ms 50th, 15754 ms 95th, 18142 ms 99th, 18313 ms 99.9th.
成功写入10w,吞吐量为0.45MB/s 平均延迟5634.44毫秒,最大延迟为18407毫秒。
2.2.1.3 topic分区:1,副本数:1,共发送10w条,每秒发送500条
命令:bin/kafka-producer-perf-test.sh --topic test_producer1_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:50000 records sent, 465.826936 records/sec (0.45 MB/sec), 3798.95 ms avg latency, 7406.00 ms max latency, 3802 ms 50th, 6999 ms 95th, 7305 ms 99th, 7379 ms 99.9th.
2.2.2 使用10M带宽模拟(理想:1.25MB/s)
2.2.2.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 999.880014 records/sec (0.98 MB/sec), 19.48 ms avg latency, 262.00 ms max latency, 11 ms 50th, 54 ms 95th, 61 ms 99th, 123 ms 99.9th.
成功写入10w,吞吐量为0.98MB/s
2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送2000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 5340.000
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 94660.000
100000 records sent, 1203.876482 records/sec (1.18 MB/sec), 15891.99 ms avg latency, 30390.00 ms max latency, 12657 ms 50th, 30055 ms 95th, 30279 ms 99th, 30358 ms 99.9th.
只成功写入94660条,失败5340条,最大吞吐量为1.18MB/s (测试每秒1300,成功写入10w)
消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-30 23:55:17:093, 2021-12-30 23:56:47:476, 97.6592, 1.0805MB/s, 100003, 1106.4359, 32, 90351, 1.0809, 1106.8278
2.2.3 使用100M带宽模拟(理想:12.5MB/s)
2.2.3.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 999.960002 records/sec (0.98 MB/sec), 2.78 ms avg latency, 335.00 ms max latency, 3 ms 50th, 4 ms 95th, 14 ms 99th, 127 ms 99.9th.
成功写入10w,而且最高延迟才335毫秒,说明还有空间去压缩,所以继续加大每秒吞吐,直到达到超时,则为最大
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 1999.880007 records/sec (1.95 MB/sec), 1.78 ms avg latency, 114.00 ms max latency, 1 ms 50th, 4 ms 95th, 6 ms 99th, 29 ms 99.9th.
也是成功,而且延迟最高114毫秒。
因为带宽100M,理想的网络吞吐量为12.5MB/s,= 12800KB/s ,而模拟数据每秒1KB,所以理想最大为每秒12800条。
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 9998.000400 records/sec (9.76 MB/sec), 5.34 ms avg latency, 117.00 ms max latency, 2 ms 50th, 31 ms 95th, 65 ms 99th, 85 ms 99.9th.
成功写入10w,吞吐量为9.76MB/s
消费:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 20000 --messages 500000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-31 00:21:25:976, 2021-12-31 00:22:19:900, 488.2900, 9.0552, 500009, 9272.4761, 31, 53893, 9.0604, 9277.8097
--------------- 通过idea来发送消息到kafka
---------1、带宽4M,topic分区1个,参数:--topic test_producer --num-records 100000 --record-size 2048 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
-------------- 开启两个java类,设置相同的参数。结果如下:带宽均分。
同时往同一个topic去写数据,吞吐量被均分0.23MB/s。
----------2、分区设置3,副本数为2,压测 参数:--topic test_idea_producer --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
leader分布均匀在每台机器。
结论:是单个分区的吞吐量的2倍多(说明多分区可以提升吞吐)。
注意:分区的leader分布也是影响吞吐量的原因之一:
如下测试的是同样的3个分区数不同的leader分布:--topic test_producer3_500 --num-records 100000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
其中s203机器作为分区【1、2】的leader,导致io开销,最终导致压测出来的结果平均0.67MB/s
经过kafka的分区重分配进行调整。kafka分区重分配https://blog.csdn.net/cuichunchi/article/details/120930445
再进行同样参数的压力测试:明显吞吐量得到提升
--------------结论 吞吐量还是和上面测试一样。
模拟代码(从源码抠出来的):
package com.kafkaTestJava; import net.sourceforge.argparse4j.ArgumentParsers;import net.sourceforge.argparse4j.inf.ArgumentParser;import net.sourceforge.argparse4j.inf.ArgumentParserException;import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;import net.sourceforge.argparse4j.inf.Namespace;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.utils.Exit;import org.apache.kafka.common.utils.Utils;import org.apache.kafka.tools.ThroughputThrottler;import org.apache.kafka.tools.ToolsUtils; import java.nio.charset.StandardCharsets;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.util.*; import static net.sourceforge.argparse4j.impl.Arguments.store;import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; /** * 压测kafka * @author :CUICHUNCHI * @date :2022/1/5 * @time : 21:16 * @description: * @modified By:2022/1/5 * @version: 1.0 */public class TestKafkaProducer2 { public static void main(String[] args) throws Exception { ArgumentParser parser = argParser(); try { Namespace res = parser.parseArgs(args); /* parse args */ String topicName = res.getString("topic"); long numRecords = res.getLong("numRecords"); Integer recordSize = res.getInt("recordSize"); int throughput = res.getInt("throughput"); List<String> producerProps = res.getList("producerConfig"); String producerConfig = res.getString("producerConfigFile"); String payloadFilePath = res.getString("payloadFile"); String transactionalId = res.getString("transactionalId"); boolean shouldPrintMetrics = res.getBoolean("printMetrics"); long transactionDurationMs = res.getLong("transactionDurationMs"); boolean transactionsEnabled = 0 < transactionDurationMs; // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter"); if (producerProps == null && producerConfig == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } List<byte[]> payloadByteList = new ArrayList<>(); if (payloadFilePath != null) { Path path = Paths.get(payloadFilePath); System.out.println("Reading payloads from: " + path.toAbsolutePath()); if (Files.notExists(path) || Files.size(path) == 0) { throw new IllegalArgumentException("File does not exist or empty file provided."); } String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter); System.out.println("Number of messages read: " + payloadList.length); for (String payload : payloadList) { payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); } } Properties props = new Properties(); if (producerConfig != null) { props.putAll(Utils.loadProps(producerConfig)); } if (producerProps != null) for (String prop : producerProps) { String[] pieces = prop.split("="); if (pieces.length != 2) throw new IllegalArgumentException("Invalid property: " + prop); props.put(pieces[0], pieces[1]); } props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); if (transactionsEnabled) props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props); if (transactionsEnabled) producer.initTransactions(); /* setup perf test */ byte[] payload = null; Random random = new Random(0); if (recordSize != null) { payload = new byte[recordSize]; for (int i = 0; i < payload.length; ++i) payload[i] = (byte) (random.nextInt(26) + 65); } ProducerRecord<byte[], byte[]> record; Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); int currentTransactionSize = 0; long transactionStartTime = 0; for (long i = 0; i < numRecords; i++) { if (transactionsEnabled && currentTransactionSize == 0) { producer.beginTransaction(); transactionStartTime = System.currentTimeMillis(); } if (payloadFilePath != null) { payload = payloadByteList.get(random.nextInt(payloadByteList.size())); } record = new ProducerRecord<>(topicName, payload); long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); producer.send(record, cb); currentTransactionSize++; if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) { producer.commitTransaction(); currentTransactionSize = 0; } if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } } if (transactionsEnabled && currentTransactionSize != 0) producer.commitTransaction(); if (!shouldPrintMetrics) { producer.close(); /* print final results */ stats.printTotal(); } else { // Make sure all messages are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py // expects this class to work with older versions of the client jar that don't support flush(). producer.flush(); /* print final results */ stats.printTotal(); /* print out metrics */ ToolsUtils.printMetrics(producer.metrics()); producer.close(); } } catch (ArgumentParserException e) { if (args.length == 0) { parser.printHelp(); Exit.exit(0); } else { parser.handleError(e); Exit.exit(1); } } } /** Get the command-line argument parser. */ private static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("producer-performance") .defaultHelp(true) .description("This tool is used to verify the producer performance."); MutuallyExclusiveGroup payloadOptions = parser .addMutuallyExclusiveGroup() .required(true) .description("either --record-size or --payload-file must be specified but not both."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") .help("produce messages to this topic"); parser.addArgument("--num-records") .action(store()) .required(true) .type(Long.class) .metavar("NUM-RECORDS") .dest("numRecords") .help("number of messages to produce"); payloadOptions.addArgument("--record-size") .action(store()) .required(false) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file."); payloadOptions.addArgument("--payload-file") .action(store()) .required(false) .type(String.class) .metavar("PAYLOAD-FILE") .dest("payloadFile") .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + "Note that you must provide exactly one of --record-size or --payload-file."); parser.addArgument("--payload-delimiter") .action(store()) .required(false) .type(String.class) .metavar("PAYLOAD-DELIMITER") .dest("payloadDelimiter") .setDefault("\\n") .help("provides delimiter to be used when --payload-file is provided. " + "Defaults to new line. " + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") .action(store()) .required(true) .type(Integer.class) .metavar("THROUGHPUT") .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling."); parser.addArgument("--producer-props") .nargs("+") .required(false) .metavar("PROP-NAME=PROP-VALUE") .type(String.class) .dest("producerConfig") .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " + "These configs take precedence over those passed via --producer.config."); parser.addArgument("--producer.config") .action(store()) .required(false) .type(String.class) .metavar("CONFIG-FILE") .dest("producerConfigFile") .help("producer config properties file."); parser.addArgument("--print-metrics") .action(storeTrue()) .type(Boolean.class) .metavar("PRINT-METRICS") .dest("printMetrics") .help("print out metrics at the end of the test."); parser.addArgument("--transactional-id") .action(store()) .required(false) .type(String.class) .metavar("TRANSACTIONAL-ID") .dest("transactionalId") .setDefault("performance-producer-default-transactional-id") .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions."); parser.addArgument("--transaction-duration-ms") .action(store()) .required(false) .type(Long.class) .metavar("TRANSACTION-DURATION") .dest("transactionDurationMs") .setDefault(0L) .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive."); return parser; } private static class Stats { private long start; private long windowStart; private int[] latencies; private int sampling; private int iteration; private int index; private long count; private long bytes; private int maxLatency; private long totalLatency; private long windowCount; private int windowMaxLatency; private long windowTotalLatency; private long windowBytes; private long reportingInterval; public Stats(long numRecords, int reportingInterval) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); this.latencies = new int[(int) (numRecords / this.sampling) + 1]; this.index = 0; this.maxLatency = 0; this.totalLatency = 0; this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; } public void record(int iter, int latency, int bytes, long time) { this.count++; this.bytes += bytes; this.totalLatency += latency; this.maxLatency = Math.max(this.maxLatency, latency); this.windowCount++; this.windowBytes += bytes; this.windowTotalLatency += latency; this.windowMaxLatency = Math.max(windowMaxLatency, latency); if (iter % this.sampling == 0) { this.latencies[index] = latency; this.index++; } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { printWindow(); newWindow(); } } public Callback nextCompletion(long start, int bytes, Stats stats) { Callback cb = new PerfCallback(this.iteration, start, bytes, stats); this.iteration++; return cb; } public void printWindow() { long ellapsed = System.currentTimeMillis() - windowStart; double recsPerSec = 1000.0 * windowCount / (double) ellapsed; double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n", windowCount, recsPerSec, mbPerSec, windowTotalLatency / (double) windowCount, (double) windowMaxLatency); } public void newWindow() { this.windowStart = System.currentTimeMillis(); this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; } public void printTotal() { long elapsed = System.currentTimeMillis() - start; double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency, percs[0], percs[1], percs[2], percs[3]); } private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); Arrays.sort(latencies, 0, size); int[] values = new int[percentiles.length]; for (int i = 0; i < percentiles.length; i++) { int index = (int) (percentiles[i] * size); values[i] = latencies[index]; } return values; } } private static final class PerfCallback implements Callback { private final long start; private final int iteration; private final int bytes; private final Stats stats; public PerfCallback(int iter, long start, int bytes, Stats stats) { this.start = start; this.stats = stats; this.iteration = iter; this.bytes = bytes; } public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); this.stats.record(iteration, latency, bytes, now); if (exception != null) exception.printStackTrace(); } } }
复制