设置虚拟机不同的带宽来进行模拟压测
---------kafka数据压测--------------
-----1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。
1.1 版本:1.1.0
-----2、压测kafka。
2.1 使用kafka自带压测工具:bin/kafka-producer-perf-test.sh
命令参数解释:
--num-records :总共发送多少条消息。
--record-size : 每条消息是多少字节。单位字节
--throughput :每秒发送多少条消息,设置-1:表示不限流,可以测试出生产者最大吞吐量
--producer-props:bootstrap.servers=s201:9092,s202:9092,s203:9092
--print-metrics: 输出所有指标
消费压测命令:bin/kafka-consumer-perf-test.sh
--broker-list broker节点列表
--fetch-size 每次fetch的数据的大小
--messages 共消费的消息数
--threads 处理线程数,默认10个
消费:kafka-console-consumer.sh --topic test_producer3 --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --from-beginning
查看消费者组信息:kafka-consumer-groups.sh --bootstrap-server s201:9092,s202:9092,s203:9092 --group aa --describe
2.2 : 模拟场景:
2.2.1 使用4M带宽模拟(理想:512KB/s)
2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 32080.000 (发送失败:32080)
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 67920.000 (发送成功:67920)
3310 records sent, 660.2 records/sec (0.64 MB/sec), 30282.4 ms avg latency, 31107.0 max latency.
100000 records sent, 766.442099 records/sec (0.75 MB/sec), 17785.39 ms avg latency, 32738.00 ms max latency, 29960 ms 50th, 30679 ms 95th, 31056 ms 99th, 32285 ms 99.9th.
一共写入10w条消息,其中有发送超时的,失败32080,吞吐量最大为0.75MB/s,每次写入的平均延迟为17785.39毫秒,最大的延迟时间32738毫秒。
结论:扛不住每秒1000条消息的写入,在带宽为4M的前提下,最大每秒也就700多条。
消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000
结果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始时间 结束时间 总消费 平均每秒消费MB 总消费条数 每秒消费条数 负载消耗时间 拉取消耗时间 拉取数据大小 拉取数据条数
2021-12-30 23:00:11, 2021-12-30 23:02:50, 66.3281, 0.4187, 67920, 428.7446, 10, 158406, 0.4187, 428.771
2.2.1.2 topic分区:3,副本数:2,共发送10w条,每秒发送500条
命令:bin/kafka-producer-perf-test.sh --topic test_producer3_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果: producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3_500} : 100000.000
100000 records sent, 459.041979 records/sec (0.45 MB/sec), 5634.44 ms avg latency, 18407.00 ms max latency, 4029 ms 50th, 15754 ms 95th, 18142 ms 99th, 18313 ms 99.9th.
成功写入10w,吞吐量为0.45MB/s 平均延迟5634.44毫秒,最大延迟为18407毫秒。
2.2.1.3 topic分区:1,副本数:1,共发送10w条,每秒发送500条
命令:bin/kafka-producer-perf-test.sh --topic test_producer1_500 --num-records 100000 --record-size 1024 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:50000 records sent, 465.826936 records/sec (0.45 MB/sec), 3798.95 ms avg latency, 7406.00 ms max latency, 3802 ms 50th, 6999 ms 95th, 7305 ms 99th, 7379 ms 99.9th.
2.2.2 使用10M带宽模拟(理想:1.25MB/s)
2.2.2.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 999.880014 records/sec (0.98 MB/sec), 19.48 ms avg latency, 262.00 ms max latency, 11 ms 50th, 54 ms 95th, 61 ms 99th, 123 ms 99.9th.
成功写入10w,吞吐量为0.98MB/s
2.2.1.1 topic分区:3,副本数:2,共发送10w条,每秒发送2000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-error-total:{client-id=producer-1, topic=test_producer3} : 5340.000
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 94660.000
100000 records sent, 1203.876482 records/sec (1.18 MB/sec), 15891.99 ms avg latency, 30390.00 ms max latency, 12657 ms 50th, 30055 ms 95th, 30279 ms 99th, 30358 ms 99.9th.
只成功写入94660条,失败5340条,最大吞吐量为1.18MB/s (测试每秒1300,成功写入10w)
消费者命令:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 10000 --messages 100000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-30 23:55:17:093, 2021-12-30 23:56:47:476, 97.6592, 1.0805MB/s, 100003, 1106.4359, 32, 90351, 1.0809, 1106.8278
2.2.3 使用100M带宽模拟(理想:12.5MB/s)
2.2.3.1 topic分区:3,副本数:2,共发送10w条,每秒发送1000条
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 999.960002 records/sec (0.98 MB/sec), 2.78 ms avg latency, 335.00 ms max latency, 3 ms 50th, 4 ms 95th, 14 ms 99th, 127 ms 99.9th.
成功写入10w,而且最高延迟才335毫秒,说明还有空间去压缩,所以继续加大每秒吞吐,直到达到超时,则为最大
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 2000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 1999.880007 records/sec (1.95 MB/sec), 1.78 ms avg latency, 114.00 ms max latency, 1 ms 50th, 4 ms 95th, 6 ms 99th, 29 ms 99.9th.
也是成功,而且延迟最高114毫秒。
因为带宽100M,理想的网络吞吐量为12.5MB/s,= 12800KB/s ,而模拟数据每秒1KB,所以理想最大为每秒12800条。
生产者命令:bin/kafka-producer-perf-test.sh --topic test_producer3 --num-records 100000 --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 --print-metrics
结果:producer-topic-metrics:record-send-total:{client-id=producer-1, topic=test_producer3} : 100000.000
100000 records sent, 9998.000400 records/sec (9.76 MB/sec), 5.34 ms avg latency, 117.00 ms max latency, 2 ms 50th, 31 ms 95th, 65 ms 99th, 85 ms 99.9th.
成功写入10w,吞吐量为9.76MB/s
消费:bin/kafka-consumer-perf-test.sh --new-consumer --topic test_producer3 --broker-list s201:9092,s202:9092,s203:9092 --fetch-size 20000 --messages 500000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-12-31 00:21:25:976, 2021-12-31 00:22:19:900, 488.2900, 9.0552, 500009, 9272.4761, 31, 53893, 9.0604, 9277.8097
--------------- 通过idea来发送消息到kafka
---------1、带宽4M,topic分区1个,参数:--topic test_producer --num-records 100000 --record-size 2048 --throughput 500 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
-------------- 开启两个java类,设置相同的参数。结果如下:带宽均分。
同时往同一个topic去写数据,吞吐量被均分0.23MB/s。
----------2、分区设置3,副本数为2,压测 参数:--topic test_idea_producer --num-records 100000 --record-size 1024 --throughput 1000 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
leader分布均匀在每台机器。
结论:是单个分区的吞吐量的2倍多(说明多分区可以提升吞吐)。
注意:分区的leader分布也是影响吞吐量的原因之一:
如下测试的是同样的3个分区数不同的leader分布:--topic test_producer3_500 --num-records 100000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=s201:9092,s202:9092,s203:9092 request.timeout.ms=62000 --print-metrics
其中s203机器作为分区【1、2】的leader,导致io开销,最终导致压测出来的结果平均0.67MB/s
经过kafka的分区重分配进行调整。kafka分区重分配https://blog.csdn.net/cuichunchi/article/details/120930445
再进行同样参数的压力测试:明显吞吐量得到提升
--------------结论 吞吐量还是和上面测试一样。
模拟代码(从源码抠出来的):
- package com.kafkaTestJava;
-
- import net.sourceforge.argparse4j.ArgumentParsers;
- import net.sourceforge.argparse4j.inf.ArgumentParser;
- import net.sourceforge.argparse4j.inf.ArgumentParserException;
- import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
- import net.sourceforge.argparse4j.inf.Namespace;
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.utils.Exit;
- import org.apache.kafka.common.utils.Utils;
- import org.apache.kafka.tools.ThroughputThrottler;
- import org.apache.kafka.tools.ToolsUtils;
-
- import java.nio.charset.StandardCharsets;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.util.*;
-
- import static net.sourceforge.argparse4j.impl.Arguments.store;
- import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
-
-
- /**
- * 压测kafka
- * @author :CUICHUNCHI
- * @date :2022/1/5
- * @time : 21:16
- * @description:
- * @modified By:2022/1/5
- * @version: 1.0
- */
- public class TestKafkaProducer2 {
- public static void main(String[] args) throws Exception {
- ArgumentParser parser = argParser();
-
- try {
- Namespace res = parser.parseArgs(args);
-
- /* parse args */
- String topicName = res.getString("topic");
- long numRecords = res.getLong("numRecords");
- Integer recordSize = res.getInt("recordSize");
- int throughput = res.getInt("throughput");
- List<String> producerProps = res.getList("producerConfig");
- String producerConfig = res.getString("producerConfigFile");
- String payloadFilePath = res.getString("payloadFile");
- String transactionalId = res.getString("transactionalId");
- boolean shouldPrintMetrics = res.getBoolean("printMetrics");
- long transactionDurationMs = res.getLong("transactionDurationMs");
- boolean transactionsEnabled = 0 < transactionDurationMs;
-
- // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here.
- String payloadDelimiter = res.getString("payloadDelimiter").equals("\\n") ? "\n" : res.getString("payloadDelimiter");
-
- if (producerProps == null && producerConfig == null) {
- throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);
- }
-
- List<byte[]> payloadByteList = new ArrayList<>();
- if (payloadFilePath != null) {
- Path path = Paths.get(payloadFilePath);
- System.out.println("Reading payloads from: " + path.toAbsolutePath());
- if (Files.notExists(path) || Files.size(path) == 0) {
- throw new IllegalArgumentException("File does not exist or empty file provided.");
- }
-
- String[] payloadList = new String(Files.readAllBytes(path), "UTF-8").split(payloadDelimiter);
-
- System.out.println("Number of messages read: " + payloadList.length);
-
- for (String payload : payloadList) {
- payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
- }
- }
-
- Properties props = new Properties();
- if (producerConfig != null) {
- props.putAll(Utils.loadProps(producerConfig));
- }
- if (producerProps != null)
- for (String prop : producerProps) {
- String[] pieces = prop.split("=");
- if (pieces.length != 2)
- throw new IllegalArgumentException("Invalid property: " + prop);
- props.put(pieces[0], pieces[1]);
- }
-
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- if (transactionsEnabled)
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
-
- if (transactionsEnabled)
- producer.initTransactions();
-
- /* setup perf test */
- byte[] payload = null;
- Random random = new Random(0);
- if (recordSize != null) {
- payload = new byte[recordSize];
- for (int i = 0; i < payload.length; ++i)
- payload[i] = (byte) (random.nextInt(26) + 65);
- }
- ProducerRecord<byte[], byte[]> record;
- Stats stats = new Stats(numRecords, 5000);
- long startMs = System.currentTimeMillis();
-
- ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
-
- int currentTransactionSize = 0;
- long transactionStartTime = 0;
- for (long i = 0; i < numRecords; i++) {
- if (transactionsEnabled && currentTransactionSize == 0) {
- producer.beginTransaction();
- transactionStartTime = System.currentTimeMillis();
- }
-
-
- if (payloadFilePath != null) {
- payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
- }
- record = new ProducerRecord<>(topicName, payload);
-
- long sendStartMs = System.currentTimeMillis();
- Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
- producer.send(record, cb);
-
- currentTransactionSize++;
- if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {
- producer.commitTransaction();
- currentTransactionSize = 0;
- }
-
- if (throttler.shouldThrottle(i, sendStartMs)) {
- throttler.throttle();
- }
- }
-
- if (transactionsEnabled && currentTransactionSize != 0)
- producer.commitTransaction();
-
- if (!shouldPrintMetrics) {
- producer.close();
-
- /* print final results */
- stats.printTotal();
- } else {
- // Make sure all messages are sent before printing out the stats and the metrics
- // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
- // expects this class to work with older versions of the client jar that don't support flush().
- producer.flush();
-
- /* print final results */
- stats.printTotal();
-
- /* print out metrics */
- ToolsUtils.printMetrics(producer.metrics());
- producer.close();
- }
- } catch (ArgumentParserException e) {
- if (args.length == 0) {
- parser.printHelp();
- Exit.exit(0);
- } else {
- parser.handleError(e);
- Exit.exit(1);
- }
- }
-
- }
-
- /** Get the command-line argument parser. */
- private static ArgumentParser argParser() {
- ArgumentParser parser = ArgumentParsers
- .newArgumentParser("producer-performance")
- .defaultHelp(true)
- .description("This tool is used to verify the producer performance.");
-
- MutuallyExclusiveGroup payloadOptions = parser
- .addMutuallyExclusiveGroup()
- .required(true)
- .description("either --record-size or --payload-file must be specified but not both.");
-
- parser.addArgument("--topic")
- .action(store())
- .required(true)
- .type(String.class)
- .metavar("TOPIC")
- .help("produce messages to this topic");
-
- parser.addArgument("--num-records")
- .action(store())
- .required(true)
- .type(Long.class)
- .metavar("NUM-RECORDS")
- .dest("numRecords")
- .help("number of messages to produce");
-
- payloadOptions.addArgument("--record-size")
- .action(store())
- .required(false)
- .type(Integer.class)
- .metavar("RECORD-SIZE")
- .dest("recordSize")
- .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.");
-
- payloadOptions.addArgument("--payload-file")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("PAYLOAD-FILE")
- .dest("payloadFile")
- .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " +
- "Payloads will be read from this file and a payload will be randomly selected when sending messages. " +
- "Note that you must provide exactly one of --record-size or --payload-file.");
-
- parser.addArgument("--payload-delimiter")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("PAYLOAD-DELIMITER")
- .dest("payloadDelimiter")
- .setDefault("\\n")
- .help("provides delimiter to be used when --payload-file is provided. " +
- "Defaults to new line. " +
- "Note that this parameter will be ignored if --payload-file is not provided.");
-
- parser.addArgument("--throughput")
- .action(store())
- .required(true)
- .type(Integer.class)
- .metavar("THROUGHPUT")
- .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.");
-
- parser.addArgument("--producer-props")
- .nargs("+")
- .required(false)
- .metavar("PROP-NAME=PROP-VALUE")
- .type(String.class)
- .dest("producerConfig")
- .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " +
- "These configs take precedence over those passed via --producer.config.");
-
- parser.addArgument("--producer.config")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("CONFIG-FILE")
- .dest("producerConfigFile")
- .help("producer config properties file.");
-
- parser.addArgument("--print-metrics")
- .action(storeTrue())
- .type(Boolean.class)
- .metavar("PRINT-METRICS")
- .dest("printMetrics")
- .help("print out metrics at the end of the test.");
-
- parser.addArgument("--transactional-id")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("TRANSACTIONAL-ID")
- .dest("transactionalId")
- .setDefault("performance-producer-default-transactional-id")
- .help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
-
- parser.addArgument("--transaction-duration-ms")
- .action(store())
- .required(false)
- .type(Long.class)
- .metavar("TRANSACTION-DURATION")
- .dest("transactionDurationMs")
- .setDefault(0L)
- .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive.");
-
-
- return parser;
- }
-
- private static class Stats {
- private long start;
- private long windowStart;
- private int[] latencies;
- private int sampling;
- private int iteration;
- private int index;
- private long count;
- private long bytes;
- private int maxLatency;
- private long totalLatency;
- private long windowCount;
- private int windowMaxLatency;
- private long windowTotalLatency;
- private long windowBytes;
- private long reportingInterval;
-
- public Stats(long numRecords, int reportingInterval) {
- this.start = System.currentTimeMillis();
- this.windowStart = System.currentTimeMillis();
- this.iteration = 0;
- this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
- this.latencies = new int[(int) (numRecords / this.sampling) + 1];
- this.index = 0;
- this.maxLatency = 0;
- this.totalLatency = 0;
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- this.totalLatency = 0;
- this.reportingInterval = reportingInterval;
- }
-
- public void record(int iter, int latency, int bytes, long time) {
- this.count++;
- this.bytes += bytes;
- this.totalLatency += latency;
- this.maxLatency = Math.max(this.maxLatency, latency);
- this.windowCount++;
- this.windowBytes += bytes;
- this.windowTotalLatency += latency;
- this.windowMaxLatency = Math.max(windowMaxLatency, latency);
- if (iter % this.sampling == 0) {
- this.latencies[index] = latency;
- this.index++;
- }
- /* maybe report the recent perf */
- if (time - windowStart >= reportingInterval) {
- printWindow();
- newWindow();
- }
- }
-
- public Callback nextCompletion(long start, int bytes, Stats stats) {
- Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
- this.iteration++;
- return cb;
- }
-
- public void printWindow() {
- long ellapsed = System.currentTimeMillis() - windowStart;
- double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
- double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
- System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",
- windowCount,
- recsPerSec,
- mbPerSec,
- windowTotalLatency / (double) windowCount,
- (double) windowMaxLatency);
- }
-
- public void newWindow() {
- this.windowStart = System.currentTimeMillis();
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- }
-
- public void printTotal() {
- long elapsed = System.currentTimeMillis() - start;
- double recsPerSec = 1000.0 * count / (double) elapsed;
- double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
- int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
- System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
- count,
- recsPerSec,
- mbPerSec,
- totalLatency / (double) count,
- (double) maxLatency,
- percs[0],
- percs[1],
- percs[2],
- percs[3]);
- }
-
- private static int[] percentiles(int[] latencies, int count, double... percentiles) {
- int size = Math.min(count, latencies.length);
- Arrays.sort(latencies, 0, size);
- int[] values = new int[percentiles.length];
- for (int i = 0; i < percentiles.length; i++) {
- int index = (int) (percentiles[i] * size);
- values[i] = latencies[index];
- }
- return values;
- }
- }
-
- private static final class PerfCallback implements Callback {
- private final long start;
- private final int iteration;
- private final int bytes;
- private final Stats stats;
-
- public PerfCallback(int iter, long start, int bytes, Stats stats) {
- this.start = start;
- this.stats = stats;
- this.iteration = iter;
- this.bytes = bytes;
- }
-
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long now = System.currentTimeMillis();
- int latency = (int) (now - start);
- this.stats.record(iteration, latency, bytes, now);
- if (exception != null)
- exception.printStackTrace();
- }
- }
-
- }