[转帖]kafka压测多维度分析实战

kafka,多维度,分析,实战 · 浏览次数 : 0

小编点评

**生成内容时需要带简单的排版** **排版格式** * 使用制定的格式对数据进行排列,例如使用逗号或空格隔开数据。 * 在对数据进行排列时使用格式化字符串对数据进行格式化,例如使用格式化字符串对时间进行格式化。 * 使用定量的格式对数据进行排列,例如使用指定格式对数据进行格式化。 **例子** **数据排列** ``` 1,2,3,4,5 100,101,102,103,104 2018,2019,2020,2021,2022 ``` **格式化字符串** ``` "时间: %d小时, %d分钟, %d秒" "%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency." "%d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th." ``` **定量格式** ``` 100,101,102,103,104,105,106,107,108,109,110 1000,1010,1020,1030,1040,1050,1060,1070,1080,1090,1100 2018,2019,2020,2021,2022,2023,2024,2025,2026,2027,2028 ```

正文

 

 

设置虚拟机不同的带宽来进行模拟压测

---------kafka数据压测--------------
-----1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。
        1.1 版本:1.1.0

-----2、压测kafka。
        2.1 使用kafka自带压测工具bin/kafka-producer-perf-test.sh
            命令参数解释:
                --num-records总共发送多少条消息
                --record-size : 每条消息是多少字节。单位字节
                --throughput  :每秒发送多少条消息,设置-1:表示不限流,可以测试出生产者最大吞吐量
                --producer-props:bootstrap.servers=s201:9092,s202:9092,s203:9092
                --print-metrics: 输出所有指标
            消费压测命令:bin/kafka-consumer-perf-test.sh
                --broker-list broker节点列表
                --fetch-size 每次fetch的数据的大小
                --messages 共消费的消息数
                --threads  处理线程数,默认10个
            消费:kafka-console-consumer.sh --topic test_producer3 --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --from-beginning
            查看消费者组信息:kafka-consumer-groups.sh --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --describe
        2.2 : 模拟场景:


            2.2.1 使用4M带宽模拟(理想:512KB/s)


                2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 32080.000 (发送失败:32080)
                          producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 67920.000 (发送成功:67920)
                          3310 records sent, 660.2 records/sec (0.64 MB/sec), 30282.4 ms avg latency, 31107.0 max latency.
                          100000 records sent, 766.442099 records/sec (0.75 MB/sec), 17785.39 ms avg latency, 32738.00 ms max latency, 29960 ms 50th, 30679 ms 95th, 31056 ms 99th, 32285 ms 99.9th.

                          一共写入10w条消息,其中有发送超时的,失败32080,吞吐量最大为0.75MB/s,每次写入的平均延迟为17785.39毫秒,最大的延迟时间32738毫秒。
                          结论:扛不住每秒1000条消息的写入,在带宽为4M的前提下,最大每秒也就700多条。

                          
                    消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000 
                    结果:
start.time, end.time,                    data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始时间    结束时间                      总消费        平均每秒消费MB    总消费条数        每秒消费条数   负载消耗时间       拉取消耗时间     拉取数据大小     拉取数据条数
2021-12-30 23:00:11, 2021-12-30 23:02:50, 66.3281,              0.4187,         67920,             428.7446,         10,             158406,         0.4187,     428.771

                          
                2.2.1.2 topic分区:3,副本数:2,共发送10w条,每秒发送500条
                    命令:bin/kafka-producer-perf-test.sh --topic test_producer3_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果: producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3_500}  : 100000.000
                           100000 records sent, 459.041979 records/sec (0.45 MB/sec), 5634.44 ms avg latency, 18407.00 ms max latency, 4029 ms 50th, 15754 ms 95th, 18142 ms 99th, 18313 ms 99.9th.

                           成功写入10w,吞吐量为0.45MB/s  平均延迟5634.44毫秒,最大延迟为18407毫秒。
                
                2.2.1.3 topic分区:1,副本数:1,共发送10w条,每秒发送500条
                    命令:bin/kafka-producer-perf-test.sh --topic test_producer1_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props     bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:50000 records sent, 465.826936 records/sec (0.45 MB/sec), 3798.95 ms avg latency, 7406.00 ms max latency, 3802 ms 50th, 6999 ms 95th, 7305 ms 99th, 7379 ms 99.9th.
                    
          

 2.2.2 使用10M带宽模拟(理想:1.25MB/s)


                2.2.2.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                        100000 records sent, 999.880014 records/sec (0.98 MB/sec), 19.48 ms avg latency, 262.00 ms max latency, 11 ms 50th, 54 ms 95th, 61 ms 99th, 123 ms 99.9th.
                        成功写入10w,吞吐量为0.98MB/s

                2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送2000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    
                    结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 5340.000
                          producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 94660.000

                          100000 records sent, 1203.876482 records/sec (1.18 MB/sec), 15891.99 ms avg latency, 30390.00 ms max latency, 12657 ms 50th, 30055 ms 95th, 30279 ms 99th, 30358 ms 99.9th.
                          只成功写入94660条,失败5340条,最大吞吐量为1.18MB/s  (测试每秒1300,成功写入10w)

                          
                    消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000 
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-30 23:55:17:093, 2021-12-30 23:56:47:476, 97.6592, 1.0805MB/s, 100003, 1106.4359, 32, 90351, 1.0809, 1106.8278
            
            2.2.3 使用100M带宽模拟(理想:12.5MB/s)


                2.2.3.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 999.960002 records/sec (0.98 MB/sec), 2.78 ms avg latency, 335.00 ms max latency, 3 ms 50th, 4 ms 95th, 14 ms 99th, 127 ms 99.9th.
                    成功写入10w,而且最高延迟才335毫秒,说明还有空间去压缩,所以继续加大每秒吞吐,直到达到超时,则为最大

                    
                    
                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 1999.880007 records/sec (1.95 MB/sec), 1.78 ms avg latency, 114.00 ms max latency, 1 ms 50th, 4 ms 95th, 6 ms 99th, 29 ms 99.9th.
                    也是成功,而且延迟最高114毫秒。

                    
                    因为带宽100M,理想的网络吞吐量为12.5MB/s,= 12800KB/s ,而模拟数据每秒1KB,所以理想最大为每秒12800条。
                    

                    生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
                    结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3}  : 100000.000
                    100000 records sent, 9998.000400 records/sec (9.76 MB/sec), 5.34 ms avg latency, 117.00 ms max latency, 2 ms 50th, 31 ms 95th, 65 ms 99th, 85 ms 99.9th.
                    成功写入10w,吞吐量为9.76MB/s

            
                    消费:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 20000 --messages 500000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

2021-12-31 00:21:25:976, 2021-12-31 00:22:19:900, 488.2900, 9.0552, 500009, 9272.4761, 31, 53893, 9.0604, 9277.8097
 

 

 

--------------- 通过idea来发送消息到kafka

---------1、带宽4M,topic分区1个,参数:--topic test_producer --num-records 100000 --record-size 2048 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

-------------- 开启两个java类,设置相同的参数。结果如下:带宽均分。

 同时往同一个topic去写数据,吞吐量被均分0.23MB/s。

----------2、分区设置3,副本数为2,压测 参数:--topic test_idea_producer --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

leader分布均匀在每台机器。 

 结论:是单个分区的吞吐量的2倍多(说明多分区可以提升吞吐)。 

注意:分区的leader分布也是影响吞吐量的原因之一

如下测试的是同样的3个分区数不同的leader分布:--topic test_producer3_500 --num-records 100000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics

 其中s203机器作为分区【1、2】的leader,导致io开销,最终导致压测出来的结果平均0.67MB/s

经过kafka的分区重分配进行调整。kafka分区重分配https://blog.csdn.net/cuichunchi/article/details/120930445

再进行同样参数的压力测试:明显吞吐量得到提升

--------------结论 吞吐量还是和上面测试一样。

模拟代码(从源码抠出来的):

package com.kafkaTestJava; import net.sourceforge.argparse4j.ArgumentParsers;import net.sourceforge.argparse4j.inf.ArgumentParser;import net.sourceforge.argparse4j.inf.ArgumentParserException;import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;import net.sourceforge.argparse4j.inf.Namespace;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.utils.Exit;import org.apache.kafka.common.utils.Utils;import org.apache.kafka.tools.ThroughputThrottler;import org.apache.kafka.tools.ToolsUtils; import java.nio.charset.StandardCharsets;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.util.*; import static net.sourceforge.argparse4j.impl.Arguments.store;import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;  /** * 压测kafka * @author :CUICHUNCHI * @date :2022/1/5 * @time : 21:16 * @description: * @modified By:2022/1/5 * @version: 1.0 */public class TestKafkaProducer2 {    public static void main(String[] args) throws Exception {        ArgumentParser parser = argParser();         try {            Namespace res = parser.parseArgs(args);             /* parse args */            String topicName = res.getString("topic");            long numRecords = res.getLong("numRecords");            Integer recordSize = res.getInt("recordSize");            int throughput = res.getInt("throughput");            List<String> producerProps = res.getList("producerConfig");            String producerConfig = res.getString("producerConfigFile");            String payloadFilePath = res.getString("payloadFile");            String transactionalId = res.getString("transactionalId");            boolean shouldPrintMetrics = res.getBoolean("printMetrics");            long transactionDurationMs = res.getLong("transactionDurationMs");            boolean transactionsEnabled =  0 < transactionDurationMs;             // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.            String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");             if (producerProps == null && producerConfig == null) {                throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);            }             List<byte[]> payloadByteList = new ArrayList<>();            if (payloadFilePath != null) {                Path path = Paths.get(payloadFilePath);                System.out.println("Reading payloads from: " + path.toAbsolutePath());                if (Files.notExists(path) || Files.size(path) == 0)  {                    throw new  IllegalArgumentException("File does not exist or empty file provided.");                }                 String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);                 System.out.println("Number of messages read: " + payloadList.length);                 for (String payload : payloadList) {                    payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));                }            }             Properties props = new Properties();            if (producerConfig != null) {                props.putAll(Utils.loadProps(producerConfig));            }            if (producerProps != null)                for (String prop : producerProps) {                    String[] pieces = prop.split("=");                    if (pieces.length != 2)                        throw new IllegalArgumentException("Invalid property: " + prop);                    props.put(pieces[0], pieces[1]);                }             props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");            if (transactionsEnabled)                props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);             if (transactionsEnabled)                producer.initTransactions();             /* setup perf test */            byte[] payload = null;            Random random = new Random(0);            if (recordSize != null) {                payload = new byte[recordSize];                for (int i = 0; i < payload.length; ++i)                    payload[i] = (byte) (random.nextInt(26) + 65);            }            ProducerRecord<byte[], byte[]> record;            Stats stats = new Stats(numRecords, 5000);            long startMs = System.currentTimeMillis();             ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);             int currentTransactionSize = 0;            long transactionStartTime = 0;            for (long i = 0; i < numRecords; i++) {                if (transactionsEnabled && currentTransactionSize == 0) {                    producer.beginTransaction();                    transactionStartTime = System.currentTimeMillis();                }                  if (payloadFilePath != null) {                    payload = payloadByteList.get(random.nextInt(payloadByteList.size()));                }                record = new ProducerRecord<>(topicName, payload);                 long sendStartMs = System.currentTimeMillis();                Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);                producer.send(record, cb);                 currentTransactionSize++;                if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {                    producer.commitTransaction();                    currentTransactionSize = 0;                }                 if (throttler.shouldThrottle(i, sendStartMs)) {                    throttler.throttle();                }            }             if (transactionsEnabled && currentTransactionSize != 0)                producer.commitTransaction();             if (!shouldPrintMetrics) {                producer.close();                 /* print final results */                stats.printTotal();            } else {                // Make sure all messages are sent before printing out the stats and the metrics                // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py                // expects this class to work with older versions of the client jar that don't support flush().                producer.flush();                 /* print final results */                stats.printTotal();                 /* print out metrics */                ToolsUtils.printMetrics(producer.metrics());                producer.close();            }        } catch (ArgumentParserException e) {            if (args.length == 0) {                parser.printHelp();                Exit.exit(0);            } else {                parser.handleError(e);                Exit.exit(1);            }        }     }     /** Get the command-line argument parser. */    private static ArgumentParser argParser() {        ArgumentParser parser = ArgumentParsers                .newArgumentParser("producer-performance")                .defaultHelp(true)                .description("This tool is used to verify the producer performance.");         MutuallyExclusiveGroup payloadOptions = parser                .addMutuallyExclusiveGroup()                .required(true)                .description("either --record-size or --payload-file must be specified but not both.");         parser.addArgument("--topic")                .action(store())                .required(true)                .type(String.class)                .metavar("TOPIC")                .help("produce messages to this topic");         parser.addArgument("--num-records")                .action(store())                .required(true)                .type(Long.class)                .metavar("NUM-RECORDS")                .dest("numRecords")                .help("number of messages to produce");         payloadOptions.addArgument("--record-size")                .action(store())                .required(false)                .type(Integer.class)                .metavar("RECORD-SIZE")                .dest("recordSize")                .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.");         payloadOptions.addArgument("--payload-file")                .action(store())                .required(false)                .type(String.class)                .metavar("PAYLOAD-FILE")                .dest("payloadFile")                .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " +                        "Payloads will be read from this file and a payload will be randomly selected when sending messages. " +                        "Note that you must provide exactly one of --record-size or --payload-file.");         parser.addArgument("--payload-delimiter")                .action(store())                .required(false)                .type(String.class)                .metavar("PAYLOAD-DELIMITER")                .dest("payloadDelimiter")                .setDefault("\\n")                .help("provides delimiter to be used when --payload-file is provided. " +                        "Defaults to new line. " +                        "Note that this parameter will be ignored if --payload-file is not provided.");         parser.addArgument("--throughput")                .action(store())                .required(true)                .type(Integer.class)                .metavar("THROUGHPUT")                .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.");         parser.addArgument("--producer-props")                .nargs("+")                .required(false)                .metavar("PROP-NAME=PROP-VALUE")                .type(String.class)                .dest("producerConfig")                .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " +                        "These configs take precedence over those passed via --producer.config.");         parser.addArgument("--producer.config")                .action(store())                .required(false)                .type(String.class)                .metavar("CONFIG-FILE")                .dest("producerConfigFile")                .help("producer config properties file.");         parser.addArgument("--print-metrics")                .action(storeTrue())                .type(Boolean.class)                .metavar("PRINT-METRICS")                .dest("printMetrics")                .help("print out metrics at the end of the test.");         parser.addArgument("--transactional-id")                .action(store())                .required(false)                .type(String.class)                .metavar("TRANSACTIONAL-ID")                .dest("transactionalId")                .setDefault("performance-producer-default-transactional-id")                .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");         parser.addArgument("--transaction-duration-ms")                .action(store())                .required(false)                .type(Long.class)                .metavar("TRANSACTION-DURATION")                .dest("transactionDurationMs")                .setDefault(0L)                .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive.");          return parser;    }     private static class Stats {        private long start;        private long windowStart;        private int[] latencies;        private int sampling;        private int iteration;        private int index;        private long count;        private long bytes;        private int maxLatency;        private long totalLatency;        private long windowCount;        private int windowMaxLatency;        private long windowTotalLatency;        private long windowBytes;        private long reportingInterval;         public Stats(long numRecords, int reportingInterval) {            this.start = System.currentTimeMillis();            this.windowStart = System.currentTimeMillis();            this.iteration = 0;            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));            this.latencies = new int[(int) (numRecords / this.sampling) + 1];            this.index = 0;            this.maxLatency = 0;            this.totalLatency = 0;            this.windowCount = 0;            this.windowMaxLatency = 0;            this.windowTotalLatency = 0;            this.windowBytes = 0;            this.totalLatency = 0;            this.reportingInterval = reportingInterval;        }         public void record(int iter, int latency, int bytes, long time) {            this.count++;            this.bytes += bytes;            this.totalLatency += latency;            this.maxLatency = Math.max(this.maxLatency, latency);            this.windowCount++;            this.windowBytes += bytes;            this.windowTotalLatency += latency;            this.windowMaxLatency = Math.max(windowMaxLatency, latency);            if (iter % this.sampling == 0) {                this.latencies[index] = latency;                this.index++;            }            /* maybe report the recent perf */            if (time - windowStart >= reportingInterval) {                printWindow();                newWindow();            }        }         public Callback nextCompletion(long start, int bytes, Stats stats) {            Callback cb = new PerfCallback(this.iteration, start, bytes, stats);            this.iteration++;            return cb;        }         public void printWindow() {            long ellapsed = System.currentTimeMillis() - windowStart;            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",                    windowCount,                    recsPerSec,                    mbPerSec,                    windowTotalLatency / (double) windowCount,                    (double) windowMaxLatency);        }         public void newWindow() {            this.windowStart = System.currentTimeMillis();            this.windowCount = 0;            this.windowMaxLatency = 0;            this.windowTotalLatency = 0;            this.windowBytes = 0;        }         public void printTotal() {            long elapsed = System.currentTimeMillis() - start;            double recsPerSec = 1000.0 * count / (double) elapsed;            double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",                    count,                    recsPerSec,                    mbPerSec,                    totalLatency / (double) count,                    (double) maxLatency,                    percs[0],                    percs[1],                    percs[2],                    percs[3]);        }         private static int[] percentiles(int[] latencies, int count, double... percentiles) {            int size = Math.min(count, latencies.length);            Arrays.sort(latencies, 0, size);            int[] values = new int[percentiles.length];            for (int i = 0; i < percentiles.length; i++) {                int index = (int) (percentiles[i] * size);                values[i] = latencies[index];            }            return values;        }    }     private static final class PerfCallback implements Callback {        private final long start;        private final int iteration;        private final int bytes;        private final Stats stats;         public PerfCallback(int iter, long start, int bytes, Stats stats) {            this.start = start;            this.stats = stats;            this.iteration = iter;            this.bytes = bytes;        }         public void onCompletion(RecordMetadata metadata, Exception exception) {            long now = System.currentTimeMillis();            int latency = (int) (now - start);            this.stats.record(iteration, latency, bytes, now);            if (exception != null)                exception.printStackTrace();        }    } }
复制

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树首页概览120812 人正在系统学习中

与[转帖]kafka压测多维度分析实战相似的内容:

[转帖]kafka压测多维度分析实战

设置虚拟机不同的带宽来进行模拟压测 kafka数据压测 1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。 1.1 版本:1.1.0 2、压测kafka。 2.1 使用kafka自带压测工具:bin/kafka-producer-perf-test.sh 命令参数解释: --num

[转帖]关于kafka压力测试(使用官方自带脚本测试)

文章目录 kafka官方自带压测脚本文件Producer生产者环境测试测试命令返回测试结果 Consumer消费者环境测试测试命令测试结果说明 提升kafka的吞吐量可通过以下的方式来提升kafka生产者的吞吐量buffer.memorycompression.typebatch.sizelinge

[转帖]Kafka 基本概念大全

https://my.oschina.net/jiagoushi/blog/5600943 下面给出 Kafka 一些重要概念,让大家对 Kafka 有个整体的认识和感知,后面还会详细的解析每一个概念的作用以及更深入的原理 ・Producer:消息生产者,向 Kafka Broker 发消息的客户端

[转帖]Kafka 与RocketMQ 落盘机制比较

https://www.jianshu.com/p/fd50befccfdd 引言 前几期的评测中,我们对比了Kafka和RocketMQ的吞吐量和稳定性,本期我们要引入一个新的评测标准——软件可靠性。 何为“可靠性”? 先看下面这种情况:有A,B两辆越野汽车,在城市的周边地区均能很好应对泥泞的路况

[转帖]Kafka关键参数设置

https://www.cnblogs.com/wwcom123/p/11181680.html 生产环境中使用Kafka,参数调优非常重要,而Kafka参数众多,我们的java的Configuration代码中,经常设置的参数如下: Properties props = new Propertie

[转帖]Kafka—配置SASL/PLAIN认证客户端及常用操作命令

介绍 SASL/PLAIN 是一种简单的 username/password安全认证机制,本文主要总结服务端开启该认证后,命令行客户端进行配置的操作流程。 配置 增加jaas.properties 在kafka的config目录下增加jaas.properties文件指定认证协议为SASL_PLAI

[转帖]kafka 配置认证与授权

https://www.cnblogs.com/yjt1993/p/14739130.html 本例不使用kerberos做认证,使用用户名和密码的方式来进行认证 1、服务端配置 1.0 配置server.properties 添加如下配置 #配置 ACL 入口类 authorizer.class.

[转帖]Kafka—配置SASL/PLAIN认证客户端及常用命令

https://www.jianshu.com/p/c1a02fb1779f 介绍 SASL/PLAIN 是一种简单的 username/password安全认证机制,本文主要总结服务端开启该认证后,命令行客户端进行配置的操作流程。 配置 增加jaas.properties 在kafka的confi

[转帖]kafka搭建kraft集群模式

kafka2.8之后不适用zookeeper进行leader选举,使用自己的controller进行选举 1.准备工作 准备三台服务器 192.168.3.110 192.168.3.111 192.168.3.112,三台服务器都要先安装好jdk1.8,配置好环境变量, 下载好kafka3.0.0

[转帖]Kafka高可用 — KRaft集群搭建

Apache Kafka Raft 是一种共识协议,它的引入是为了消除 Kafka 对 ZooKeeper 的元数据管理的依赖,被社区称之为 Kafka Raft metadata mode,简称 KRaft 模式。本文介绍了KRaft模式及三节点的 KRaft 集群搭建。 1 KRaft介绍 KR