[转帖]kafka压测多维度分析实战

kafka,多维度,分析,实战 · 浏览次数 : 0

小编点评

**生成内容时需要带简单的排版** **排版格式** * 使用制定的格式对数据进行排列,例如使用逗号或空格隔开数据。 * 在对数据进行排列时使用格式化字符串对数据进行格式化,例如使用格式化字符串对时间进行格式化。 * 使用定量的格式对数据进行排列,例如使用指定格式对数据进行格式化。 **例子** **数据排列** ``` 1,2,3,4,5 100,101,102,103,104 2018,2019,2020,2021,2022 ``` **格式化字符串** ``` "时间: %d小时, %d分钟, %d秒" "%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency." "%d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th." ``` **定量格式** ``` 100,101,102,103,104,105,106,107,108,109,110 1000,1010,1020,1030,1040,1050,1060,1070,1080,1090,1100 2018,2019,2020,2021,2022,2023,2024,2025,2026,2027,2028 ```

正文

 

 

设置虚拟机不同的带宽来进行模拟压测

---------kafka数据压测--------------
-----1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。
        1.1 版本:1.1.0

-----2、压测kafka。
        2.1 使用kafka自带压测工具bin/kafka-producer-perf-test.sh
            命令参数解释:
                --num-records总共发送多少条消息
                --record-size : 每条消息是多少字节。单位字节
                --throughput  :每秒发送多少条消息,设置-1:表示不限流,可以测试出生产者最大吞吐量
                --producer-props:bootstrap.servers=s201:9092,s202:9092,s203:9092
                --print-metrics: 输出所有指标
            消费压测命令:bin/kafka-consumer-perf-test.sh
                --broker-list broker节点列表
                --fetch-size 每次fetch的数据的大小
                --messages 共消费的消息数
                --threads  处理线程数,默认10个
            消费:kafka-console-consumer.sh --topic test_producer3 --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --from-beginning
            查看消费者组信息:kafka-consumer-groups.sh --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --describe
        2.2 : 模拟场景:


            2.2.1 使用4M带宽模拟(理想:512KB/s)


                2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 32080.000 (发送失败:32080)
                          producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 67920.000 (发送成功:67920)
                          3310 records sent, 660.2 records/sec (0.64 MB/sec), 30282.4 ms avg latency, 31107.0 max latency.
                          100000 records sent, 766.442099 records/sec (0.75 MB/sec), 17785.39 ms avg latency, 32738.00 ms max latency, 29960 ms 50th, 30679 ms 95th, 31056 ms 99th, 32285 ms 99.9th.

                          一共写入10w条消息,其中有发送超时的,失败32080,吞吐量最大为0.75MB/s,每次写入的平均延迟为17785.39毫秒,最大的延迟时间32738毫秒。
                          结论:扛不住每秒1000条消息的写入,在带宽为4M的前提下,最大每秒也就700多条。

                          
                    消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000 
                    结果:
start.time, end.time,                    data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始时间    结束时间                      总消费        平均每秒消费MB    总消费条数        每秒消费条数   负载消耗时间       拉取消耗时间     拉取数据大小     拉取数据条数
2021-12-30 23:00:11, 2021-12-30 23:02:50, 66.3281,              0.4187,         67920,             428.7446,         10,             158406,         0.4187,     428.771

                          
                2.2.1.2 topic分区:3,副本数:2,共发送10w条,每秒发送500条
                    命令:bin/kafka-producer-perf-test.sh --topic test_producer3_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果: producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3_500}  : 100000.000
                           100000 records sent, 459.041979 records/sec (0.45 MB/sec), 5634.44 ms avg latency, 18407.00 ms max latency, 4029 ms 50th, 15754 ms 95th, 18142 ms 99th, 18313 ms 99.9th.

                           成功写入10w,吞吐量为0.45MB/s  平均延迟5634.44毫秒,最大延迟为18407毫秒。
                
                2.2.1.3 topic分区:1,副本数:1,共发送10w条,每秒发送500条
                    命令:bin/kafka-producer-perf-test.sh --topic test_producer1_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props     bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:50000 records sent, 465.826936 records/sec (0.45 MB/sec), 3798.95 ms avg latency, 7406.00 ms max latency, 3802 ms 50th, 6999 ms 95th, 7305 ms 99th, 7379 ms 99.9th.
                    
          

 2.2.2 使用10M带宽模拟(理想:1.25MB/s)


                2.2.2.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                        100000 records sent, 999.880014 records/sec (0.98 MB/sec), 19.48 ms avg latency, 262.00 ms max latency, 11 ms 50th, 54 ms 95th, 61 ms 99th, 123 ms 99.9th.
                        成功写入10w,吞吐量为0.98MB/s

                2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送2000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    
                    结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 5340.000
                          producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 94660.000

                          100000 records sent, 1203.876482 records/sec (1.18 MB/sec), 15891.99 ms avg latency, 30390.00 ms max latency, 12657 ms 50th, 30055 ms 95th, 30279 ms 99th, 30358 ms 99.9th.
                          只成功写入94660条,失败5340条,最大吞吐量为1.18MB/s  (测试每秒1300,成功写入10w)

                          
                    消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000 
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-30 23:55:17:093, 2021-12-30 23:56:47:476, 97.6592, 1.0805MB/s, 100003, 1106.4359, 32, 90351, 1.0809, 1106.8278
            
            2.2.3 使用100M带宽模拟(理想:12.5MB/s)


                2.2.3.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 999.960002 records/sec (0.98 MB/sec), 2.78 ms avg latency, 335.00 ms max latency, 3 ms 50th, 4 ms 95th, 14 ms 99th, 127 ms 99.9th.
                    成功写入10w,而且最高延迟才335毫秒,说明还有空间去压缩,所以继续加大每秒吞吐,直到达到超时,则为最大

                    
                    
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 1999.880007 records/sec (1.95 MB/sec), 1.78 ms avg latency, 114.00 ms max latency, 1 ms 50th, 4 ms 95th, 6 ms 99th, 29 ms 99.9th.
                    也是成功,而且延迟最高114毫秒。

                    
                    因为带宽100M,理想的网络吞吐量为12.5MB/s,= 12800KB/s ,而模拟数据每秒1KB,所以理想最大为每秒12800条。
                    

                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 9998.000400 records/sec (9.76 MB/sec), 5.34 ms avg latency, 117.00 ms max latency, 2 ms 50th, 31 ms 95th, 65 ms 99th, 85 ms 99.9th.
                    成功写入10w,吞吐量为9.76MB/s

            
                    消费:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 20000 --messages 500000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2021-12-31 00:21:25:976, 2021-12-31 00:22:19:900, 488.2900, 9.0552, 500009, 9272.4761, 31, 53893, 9.0604, 9277.8097
 

 

 

--------------- 通过idea来发送消息到kafka

---------1、带宽4M,topic分区1个,参数:--topic test_producer --num-records 100000 --record-size 2048 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

-------------- 开启两个java类,设置相同的参数。结果如下:带宽均分。

 同时往同一个topic去写数据,吞吐量被均分0.23MB/s。

----------2、分区设置3,副本数为2,压测 参数:--topic test_idea_producer --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

leader分布均匀在每台机器。 

 结论:是单个分区的吞吐量的2倍多(说明多分区可以提升吞吐)。 

注意:分区的leader分布也是影响吞吐量的原因之一

如下测试的是同样的3个分区数不同的leader分布:--topic test_producer3_500 --num-records 100000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

 其中s203机器作为分区【1、2】的leader,导致io开销,最终导致压测出来的结果平均0.67MB/s

经过kafka的分区重分配进行调整。kafka分区重分配https://blog.csdn.net/cuichunchi/article/details/120930445

再进行同样参数的压力测试:明显吞吐量得到提升

--------------结论 吞吐量还是和上面测试一样。

模拟代码(从源码抠出来的):

  1. package com.kafkaTestJava;
  2. import net.sourceforge.argparse4j.ArgumentParsers;
  3. import net.sourceforge.argparse4j.inf.ArgumentParser;
  4. import net.sourceforge.argparse4j.inf.ArgumentParserException;
  5. import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
  6. import net.sourceforge.argparse4j.inf.Namespace;
  7. import org.apache.kafka.clients.producer.*;
  8. import org.apache.kafka.common.utils.Exit;
  9. import org.apache.kafka.common.utils.Utils;
  10. import org.apache.kafka.tools.ThroughputThrottler;
  11. import org.apache.kafka.tools.ToolsUtils;
  12. import java.nio.charset.StandardCharsets;
  13. import java.nio.file.Files;
  14. import java.nio.file.Path;
  15. import java.nio.file.Paths;
  16. import java.util.*;
  17. import static net.sourceforge.argparse4j.impl.Arguments.store;
  18. import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  19. /**
  20. * 压测kafka
  21. * @author :CUICHUNCHI
  22. * @date :2022/1/5
  23. * @time : 21:16
  24. * @description
  25. * @modified By:2022/1/5
  26. * @version: 1.0
  27. */
  28. public class TestKafkaProducer2 {
  29. public static void main(String[] args) throws Exception {
  30. ArgumentParser parser = argParser();
  31. try {
  32. Namespace res = parser.parseArgs(args);
  33. /* parse args */
  34. String topicName = res.getString("topic");
  35. long numRecords = res.getLong("numRecords");
  36. Integer recordSize = res.getInt("recordSize");
  37. int throughput = res.getInt("throughput");
  38. List<String> producerProps = res.getList("producerConfig");
  39. String producerConfig = res.getString("producerConfigFile");
  40. String payloadFilePath = res.getString("payloadFile");
  41. String transactionalId = res.getString("transactionalId");
  42. boolean shouldPrintMetrics = res.getBoolean("printMetrics");
  43. long transactionDurationMs = res.getLong("transactionDurationMs");
  44. boolean transactionsEnabled = 0 < transactionDurationMs;
  45. // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
  46. String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
  47. if (producerProps == null && producerConfig == null) {
  48. throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);
  49. }
  50. List<byte[]> payloadByteList = new ArrayList<>();
  51. if (payloadFilePath != null) {
  52. Path path = Paths.get(payloadFilePath);
  53. System.out.println("Reading payloads from: " + path.toAbsolutePath());
  54. if (Files.notExists(path) || Files.size(path) == 0) {
  55. throw new IllegalArgumentException("File does not exist or empty file provided.");
  56. }
  57. String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);
  58. System.out.println("Number of messages read: " + payloadList.length);
  59. for (String payload : payloadList) {
  60. payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
  61. }
  62. }
  63. Properties props = new Properties();
  64. if (producerConfig != null) {
  65. props.putAll(Utils.loadProps(producerConfig));
  66. }
  67. if (producerProps != null)
  68. for (String prop : producerProps) {
  69. String[] pieces = prop.split("=");
  70. if (pieces.length != 2)
  71. throw new IllegalArgumentException("Invalid property: " + prop);
  72. props.put(pieces[0], pieces[1]);
  73. }
  74. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  75. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  76. if (transactionsEnabled)
  77. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
  78. KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
  79. if (transactionsEnabled)
  80. producer.initTransactions();
  81. /* setup perf test */
  82. byte[] payload = null;
  83. Random random = new Random(0);
  84. if (recordSize != null) {
  85. payload = new byte[recordSize];
  86. for (int i = 0; i < payload.length; ++i)
  87. payload[i] = (byte) (random.nextInt(26) + 65);
  88. }
  89. ProducerRecord<byte[], byte[]> record;
  90. Stats stats = new Stats(numRecords, 5000);
  91. long startMs = System.currentTimeMillis();
  92. ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
  93. int currentTransactionSize = 0;
  94. long transactionStartTime = 0;
  95. for (long i = 0; i < numRecords; i++) {
  96. if (transactionsEnabled && currentTransactionSize == 0) {
  97. producer.beginTransaction();
  98. transactionStartTime = System.currentTimeMillis();
  99. }
  100. if (payloadFilePath != null) {
  101. payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
  102. }
  103. record = new ProducerRecord<>(topicName, payload);
  104. long sendStartMs = System.currentTimeMillis();
  105. Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
  106. producer.send(record, cb);
  107. currentTransactionSize++;
  108. if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
  109. producer.commitTransaction();
  110. currentTransactionSize = 0;
  111. }
  112. if (throttler.shouldThrottle(i, sendStartMs)) {
  113. throttler.throttle();
  114. }
  115. }
  116. if (transactionsEnabled && currentTransactionSize != 0)
  117. producer.commitTransaction();
  118. if (!shouldPrintMetrics) {
  119. producer.close();
  120. /* print final results */
  121. stats.printTotal();
  122. } else {
  123. // Make sure all messages are sent before printing out the stats and the metrics
  124. // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
  125. // expects this class to work with older versions of the client jar that don't support flush().
  126. producer.flush();
  127. /* print final results */
  128. stats.printTotal();
  129. /* print out metrics */
  130. ToolsUtils.printMetrics(producer.metrics());
  131. producer.close();
  132. }
  133. } catch (ArgumentParserException e) {
  134. if (args.length == 0) {
  135. parser.printHelp();
  136. Exit.exit(0);
  137. } else {
  138. parser.handleError(e);
  139. Exit.exit(1);
  140. }
  141. }
  142. }
  143. /** Get the command-line argument parser. */
  144. private static ArgumentParser argParser() {
  145. ArgumentParser parser = ArgumentParsers
  146. .newArgumentParser("producer-performance")
  147. .defaultHelp(true)
  148. .description("This tool is used to verify the producer performance.");
  149. MutuallyExclusiveGroup payloadOptions = parser
  150. .addMutuallyExclusiveGroup()
  151. .required(true)
  152. .description("either --record-size or --payload-file must be specified but not both.");
  153. parser.addArgument("--topic")
  154. .action(store())
  155. .required(true)
  156. .type(String.class)
  157. .metavar("TOPIC")
  158. .help("produce messages to this topic");
  159. parser.addArgument("--num-records")
  160. .action(store())
  161. .required(true)
  162. .type(Long.class)
  163. .metavar("NUM-RECORDS")
  164. .dest("numRecords")
  165. .help("number of messages to produce");
  166. payloadOptions.addArgument("--record-size")
  167. .action(store())
  168. .required(false)
  169. .type(Integer.class)
  170. .metavar("RECORD-SIZE")
  171. .dest("recordSize")
  172. .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.");
  173. payloadOptions.addArgument("--payload-file")
  174. .action(store())
  175. .required(false)
  176. .type(String.class)
  177. .metavar("PAYLOAD-FILE")
  178. .dest("payloadFile")
  179. .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " +
  180. "Payloads will be read from this file and a payload will be randomly selected when sending messages. " +
  181. "Note that you must provide exactly one of --record-size or --payload-file.");
  182. parser.addArgument("--payload-delimiter")
  183. .action(store())
  184. .required(false)
  185. .type(String.class)
  186. .metavar("PAYLOAD-DELIMITER")
  187. .dest("payloadDelimiter")
  188. .setDefault("\\n")
  189. .help("provides delimiter to be used when --payload-file is provided. " +
  190. "Defaults to new line. " +
  191. "Note that this parameter will be ignored if --payload-file is not provided.");
  192. parser.addArgument("--throughput")
  193. .action(store())
  194. .required(true)
  195. .type(Integer.class)
  196. .metavar("THROUGHPUT")
  197. .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.");
  198. parser.addArgument("--producer-props")
  199. .nargs("+")
  200. .required(false)
  201. .metavar("PROP-NAME=PROP-VALUE")
  202. .type(String.class)
  203. .dest("producerConfig")
  204. .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " +
  205. "These configs take precedence over those passed via --producer.config.");
  206. parser.addArgument("--producer.config")
  207. .action(store())
  208. .required(false)
  209. .type(String.class)
  210. .metavar("CONFIG-FILE")
  211. .dest("producerConfigFile")
  212. .help("producer config properties file.");
  213. parser.addArgument("--print-metrics")
  214. .action(storeTrue())
  215. .type(Boolean.class)
  216. .metavar("PRINT-METRICS")
  217. .dest("printMetrics")
  218. .help("print out metrics at the end of the test.");
  219. parser.addArgument("--transactional-id")
  220. .action(store())
  221. .required(false)
  222. .type(String.class)
  223. .metavar("TRANSACTIONAL-ID")
  224. .dest("transactionalId")
  225. .setDefault("performance-producer-default-transactional-id")
  226. .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
  227. parser.addArgument("--transaction-duration-ms")
  228. .action(store())
  229. .required(false)
  230. .type(Long.class)
  231. .metavar("TRANSACTION-DURATION")
  232. .dest("transactionDurationMs")
  233. .setDefault(0L)
  234. .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive.");
  235. return parser;
  236. }
  237. private static class Stats {
  238. private long start;
  239. private long windowStart;
  240. private int[] latencies;
  241. private int sampling;
  242. private int iteration;
  243. private int index;
  244. private long count;
  245. private long bytes;
  246. private int maxLatency;
  247. private long totalLatency;
  248. private long windowCount;
  249. private int windowMaxLatency;
  250. private long windowTotalLatency;
  251. private long windowBytes;
  252. private long reportingInterval;
  253. public Stats(long numRecords, int reportingInterval) {
  254. this.start = System.currentTimeMillis();
  255. this.windowStart = System.currentTimeMillis();
  256. this.iteration = 0;
  257. this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
  258. this.latencies = new int[(int) (numRecords / this.sampling) + 1];
  259. this.index = 0;
  260. this.maxLatency = 0;
  261. this.totalLatency = 0;
  262. this.windowCount = 0;
  263. this.windowMaxLatency = 0;
  264. this.windowTotalLatency = 0;
  265. this.windowBytes = 0;
  266. this.totalLatency = 0;
  267. this.reportingInterval = reportingInterval;
  268. }
  269. public void record(int iter, int latency, int bytes, long time) {
  270. this.count++;
  271. this.bytes += bytes;
  272. this.totalLatency += latency;
  273. this.maxLatency = Math.max(this.maxLatency, latency);
  274. this.windowCount++;
  275. this.windowBytes += bytes;
  276. this.windowTotalLatency += latency;
  277. this.windowMaxLatency = Math.max(windowMaxLatency, latency);
  278. if (iter % this.sampling == 0) {
  279. this.latencies[index] = latency;
  280. this.index++;
  281. }
  282. /* maybe report the recent perf */
  283. if (time - windowStart >= reportingInterval) {
  284. printWindow();
  285. newWindow();
  286. }
  287. }
  288. public Callback nextCompletion(long start, int bytes, Stats stats) {
  289. Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
  290. this.iteration++;
  291. return cb;
  292. }
  293. public void printWindow() {
  294. long ellapsed = System.currentTimeMillis() - windowStart;
  295. double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
  296. double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
  297. System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",
  298. windowCount,
  299. recsPerSec,
  300. mbPerSec,
  301. windowTotalLatency / (double) windowCount,
  302. (double) windowMaxLatency);
  303. }
  304. public void newWindow() {
  305. this.windowStart = System.currentTimeMillis();
  306. this.windowCount = 0;
  307. this.windowMaxLatency = 0;
  308. this.windowTotalLatency = 0;
  309. this.windowBytes = 0;
  310. }
  311. public void printTotal() {
  312. long elapsed = System.currentTimeMillis() - start;
  313. double recsPerSec = 1000.0 * count / (double) elapsed;
  314. double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
  315. int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
  316. System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
  317. count,
  318. recsPerSec,
  319. mbPerSec,
  320. totalLatency / (double) count,
  321. (double) maxLatency,
  322. percs[0],
  323. percs[1],
  324. percs[2],
  325. percs[3]);
  326. }
  327. private static int[] percentiles(int[] latencies, int count, double... percentiles) {
  328. int size = Math.min(count, latencies.length);
  329. Arrays.sort(latencies, 0, size);
  330. int[] values = new int[percentiles.length];
  331. for (int i = 0; i < percentiles.length; i++) {
  332. int index = (int) (percentiles[i] * size);
  333. values[i] = latencies[index];
  334. }
  335. return values;
  336. }
  337. }
  338. private static final class PerfCallback implements Callback {
  339. private final long start;
  340. private final int iteration;
  341. private final int bytes;
  342. private final Stats stats;
  343. public PerfCallback(int iter, long start, int bytes, Stats stats) {
  344. this.start = start;
  345. this.stats = stats;
  346. this.iteration = iter;
  347. this.bytes = bytes;
  348. }
  349. public void onCompletion(RecordMetadata metadata, Exception exception) {
  350. long now = System.currentTimeMillis();
  351. int latency = (int) (now - start);
  352. this.stats.record(iteration, latency, bytes, now);
  353. if (exception != null)
  354. exception.printStackTrace();
  355. }
  356. }
  357. }

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览120812 人正在系统学习中

与[转帖]kafka压测多维度分析实战相似的内容:

[转帖]kafka压测多维度分析实战

设置虚拟机不同的带宽来进行模拟压测 kafka数据压测 1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。 1.1 版本:1.1.0 2、压测kafka。 2.1 使用kafka自带压测工具:bin/kafka-producer-perf-test.sh 命令参数解释: --num

[转帖]关于kafka压力测试(使用官方自带脚本测试)

文章目录 kafka官方自带压测脚本文件Producer生产者环境测试测试命令返回测试结果 Consumer消费者环境测试测试命令测试结果说明 提升kafka的吞吐量可通过以下的方式来提升kafka生产者的吞吐量buffer.memorycompression.typebatch.sizelinge

[转帖]Kafka 基本概念大全

https://my.oschina.net/jiagoushi/blog/5600943 下面给出 Kafka 一些重要概念,让大家对 Kafka 有个整体的认识和感知,后面还会详细的解析每一个概念的作用以及更深入的原理 ・Producer:消息生产者,向 Kafka Broker 发消息的客户端

[转帖]Kafka 与RocketMQ 落盘机制比较

https://www.jianshu.com/p/fd50befccfdd 引言 前几期的评测中,我们对比了Kafka和RocketMQ的吞吐量和稳定性,本期我们要引入一个新的评测标准——软件可靠性。 何为“可靠性”? 先看下面这种情况:有A,B两辆越野汽车,在城市的周边地区均能很好应对泥泞的路况

[转帖]Kafka关键参数设置

https://www.cnblogs.com/wwcom123/p/11181680.html 生产环境中使用Kafka,参数调优非常重要,而Kafka参数众多,我们的java的Configuration代码中,经常设置的参数如下: Properties props = new Propertie

[转帖]Kafka—配置SASL/PLAIN认证客户端及常用操作命令

介绍 SASL/PLAIN 是一种简单的 username/password安全认证机制,本文主要总结服务端开启该认证后,命令行客户端进行配置的操作流程。 配置 增加jaas.properties 在kafka的config目录下增加jaas.properties文件指定认证协议为SASL_PLAI

[转帖]kafka 配置认证与授权

https://www.cnblogs.com/yjt1993/p/14739130.html 本例不使用kerberos做认证,使用用户名和密码的方式来进行认证 1、服务端配置 1.0 配置server.properties 添加如下配置 #配置 ACL 入口类 authorizer.class.

[转帖]Kafka—配置SASL/PLAIN认证客户端及常用命令

https://www.jianshu.com/p/c1a02fb1779f 介绍 SASL/PLAIN 是一种简单的 username/password安全认证机制,本文主要总结服务端开启该认证后,命令行客户端进行配置的操作流程。 配置 增加jaas.properties 在kafka的confi

[转帖]kafka搭建kraft集群模式

kafka2.8之后不适用zookeeper进行leader选举,使用自己的controller进行选举 1.准备工作 准备三台服务器 192.168.3.110 192.168.3.111 192.168.3.112,三台服务器都要先安装好jdk1.8,配置好环境变量, 下载好kafka3.0.0

[转帖]Kafka高可用 — KRaft集群搭建

Apache Kafka Raft 是一种共识协议,它的引入是为了消除 Kafka 对 ZooKeeper 的元数据管理的依赖,被社区称之为 Kafka Raft metadata mode,简称 KRaft 模式。本文介绍了KRaft模式及三节点的 KRaft 集群搭建。 1 KRaft介绍 KR