[转帖]Day63_Kafka(一)

day63,kafka · 浏览次数 : 0

小编点评

#Kafka和Flume整合  上图1-22,显示的是flume采集完毕数据之后,进行的离线处理和实时处理两条业务线,现在再来学习flume和kafka的整合处理。 配置flume.conf文件 #为我们的source channel sink起名a1.sources = r1a1.channels = c1a1.sinks = k1a1.sinks.k1.kafka.topic = testa1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092a1.sinks.r1.channels = c1#指定我们的source数据收集策略a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/servers/flumedataa1.sources.r1.deletePolicy = nevera1.sources.r1.fileSuffix = .COMPLETEDa1.sources.r1.ignorePattern = ^(.)*\\\\.tmp$a1.sources.r1.inputCharset = UTF-8#指定我们的channel为memory,即表示所有的数据都装进memory当中a1.channels.c1.type = memory#指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = testa1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1  启动flume和kafka的整合测试 [offcn@bd-offcn-02 kafka]$ bin/kafka-console-consumer.sh \\ --topic test \\ --bootstrap-server node01:9092,node02:9092,node03:9092 \\ --from-beginning 消费者监听读取的数据 启动flume-agent [root@node01 flume]$ bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console 发送与接收数据验证 验证结果: 显示的发送与接收数据,可以说明flume和kafka的整合成功。

正文

第一讲 Kafka基础操作

课程大纲

课程内容

学习效果

掌握目标

Kafka简介

消息队列

掌握

Kafka简介

Kafka分布式环境

Kafka操作

Kafka shell

掌握

Kafka api

Flume整合kafka

一、Kafka简介

(一)消息队列

1、为甚要有消息队列

 

2、消息队列

  1. 消息 Message
  2. 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。

  3. 队列 Queue
  4. 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO)。入队、出队。

  5. 消息队列 MQ
  6. 消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。

3、消息队列的分类

MQ主要分为两类:点对点(p2p)、发布订阅(Pub/Sub)

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息

消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息) 消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。 消息可以传给多个消费者

 4、p2p和发布订阅MQ的比较

1、共同点:

消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。

2、不同点:

p2p模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver)

一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。

pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。

5、消息系统的使用场景

  1. 解耦 各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
  2. 冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
  3. 扩展 消息系统是统一的数据接口,各系统可独立扩展
  4. 峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
  5. 可恢复性 系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据
  6. 异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理

6、常见的消息系统

  1. RabbitMQ Erlang编写,支持多协议AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式。
  2. Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10kb)的性能比RabbitMQ好,长消息性能比RabbitMQ差。
  3. ZeroMQ 轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高。
  4. ActiveMQ JMS实现,Peer-to-Peer,支持持久化、XA(分布式)事务
  5. Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
  6. MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务

 (二)Kafka简介

1、Kafka简介

Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,于2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。如下图1-3所示,很好的显示了Kafka的应用与组成。

 

2、特点

高吞吐量

可以满足每秒百万级别消息的生产和消费——生产消费。

持久性

有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。

分布式

基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体

健壮性。

3、设计目标

  1. 高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写
  2. 消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放
  3. 完全分布式 Producer,Broker,Consumer均支持水平扩展
  4. 同时适应在线流处理和离线批处理

4、Kafka核心概念

一个MQ需要哪些部分?生产、消费、消息类别、存储等等。

 对于kafka而言,kafka服务就像是一个大的水池。不断的生产、存储、消费着各种类别的消息。那么kafka由何组成呢?

Kafka服务

  • Topic:主题,Kafka处理的消息的不同分类
  • Broker消息服务器代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。
  • PartitionTopic物理上的分组一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。
  • Message消息,是通信的基本单位,每个消息都属于一个partition

Kafka服务相关

  • Producer:消息和数据的生产者,向Kafka的一个topic发布消息。
  • Consumer:消息和数据的消费者,定于topic并处理其发布的消息。
  • Zookeeper:协调kafka的正常运行。

二、Kafka分布式环境安装

(一)版本说明

安装包和源码包分别如下:

http://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz     

http://archive.apache.org/dist/kafka/1.1.1/kafka-1.1.1-src.tgz

(二)安装配置

1、安装与配置

解压

[root@node01 ~]$ tar -zxvf kafka_2.11-1.0.0.tgz -C /home/offcn/apps/

重命名

[root@node01 ~]$ mv kafka_2.11-1.1.1 kafka

bd-offcn-01执行以下命令创建数据文件存放目录

[root@node01 ~]$ mkdir -p  /home/offcn/logs/kafka

修改配置文件

修改$KAFKA_HOME/config/server.properties

  1. ## 当前kafka实例的id,必须为整数,一个集群中不可重复
  2. broker.id=0
  3. ## 生产到kafka中的数据存储的目录,目录需要手动创建
  4. log.dirs=/export/servers/kafka/logs
  5. ## kafka数据在zk中的存储目录
  6. zookeeper.connect=node01:2181,node02:2181,node03:2181
  7. ##添加配置,用来删除topic
  8. delete.topic.enable=true
  9. host.name=node01

同步到其他机器

  1. [root@node01 servers]$ scp -r kafka/ bd-offcn-02:$PWD
  2. [root@node01 servers]$ scp -r kafka/ bd-offcn-03:$PWD

修改broker.id

  1. ##修改broker.id
  2. broker.id=1
  3. broker.id=2
  4. ##修改host.name
  5. host.name=bd-offcn-02
  6. host.name=bd-offcn-03

2、 服务启动

 服务启动:每台都要运行此命令

[root@node01 kafka]$ nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

三、Kafka基本操作

(一)、Kafka的topic操作

topic是kafka非常重要的核心概念,是用来存储各种类型的数据的,所以最基本的就需要学会如何在kafka中创建、修改、删除的topic,以及如何向topic生产消费数据。

关于topic的操作脚本:kafka-topics.sh

1、创建topic

  1. [root@node01 kafka]# bin/kafka-topics.sh --create \
  2. --topic hadoop \ ## 指定要创建的topic的名称
  3. --zookeeper node01:2181,node02:2181,node03:2181/kafka \
  4. ##指定kafka关联的zk地址
  5. --partitions 3 \    ##指定该topic的分区个数
  6. --replication-factor 3 ##指定副本因子
  7. bin/kafka-topics.sh --create --topic hadoop --zookeeper bd-offcn-01:2181,bd-offcn-02:2181,bd-offcn-03:2181 --partitions 3 --replication-factor 3
  8. bin/kafka-topics.sh --create --topic test --zookeeper bd-offcn-01:2181,bd-offcn-02:2181,bd-offcn-03:2181 --partitions 3 --replication-factor 3

注意:指定副本因子的时候,不能大于broker实例个数,否则报错,如下图1-6所示:

 当使用正确的方式,即将replication-factor设置为3,之后执行脚本命令,创建topic成功,如下图1-7所示。

 

 与此同时,在kafka数据目录data.dir=/export/servers/kafka/logs中有了新变化,如下图1-8所示。

2、查看topic列表

[root@node01 kafka]$ bin/kafka-topics.sh --list \

--zookeeper node01:2181,node02:2181,node03:2181

 

3、查看每一个topic的信息

[root@node01 kafka]$bin/kafka-topics.sh --describe --topic hadoop -zookeeper bd-offcn-01:2181,bd-offcn-02:2181,bd-offcn-03:2181

其中partition,replicas,leader,isr代表的是什么意思呢,下面做一下解释。

  • Partition: 当前topic对应的分区编号
  • Replicas:  副本因子,当前kafka对应的partition所在的broker实例的broker.id的列表
  • Leader:  该partition的所有副本中的leader领导者,处理所有kafka该partition读写请求
  • ISR:  该partition的存活的副本对应的broker实例的broker.id的列表

但是注意:partition个数,只能增加,不能减少,如下图1-12所示。

4、 删除一个topic

[root@node01 kafka]$ bin/kafka-topics.sh --delete \

--topic test \

--zookeeper node01:2181

 5、修改一个topic

[root@node01 kafka]$ bin/kafka-topics.sh --alter \

--topic hadoop \

--partitions 4 \

--zookeeper node01:2181

执行结果如下图1-11所示,可以看出partition由原先的3个变成了4个。

 但是注意:partition个数,只能增加,不能减少,如下图1-12所示。

(二)Kafka终端数据生产与消费

在$KAFKA_HOME/bin目录下面提供了很多脚本,其中kafka-console-producer.sh和kafka-console-consumer.sh分别用来在终端模拟生产和消费数据,即作为kafka topic的生产者和消费者存在。

1、生产数据

生产数据,执行以下的命令:

  1. [root@node01 kafka]$ bin/kafka-console-producer.sh \
  2. --topic hadoop \
  3. --broker-list bd-offcn-01:9092,bd-offcn-02:9092,bd-offcn-03:9092

 

2、消费数据

类似的,消费刚刚生产的数据需要执行以下命令:

  1. [root@node02 kafka]$ bin/kafka-console-consumer.sh \
  2. --topic hadoop \
  3. --bootstrap-server bd-offcn-01:9092,bd-offcn-02:9092,bd-offcn-03:9092

但遗憾的是,我们并没有看到刚刚生产的数据,这很好理解,比如新闻联播每晚7点开始了,结果你7点15才打开电视看新闻,自然7点到7点15之间的新闻你就会错过,如果你想要看这之间的新闻,那么就需要其提供回放的功能,幸运的是kafka不仅提供了从头开始回放数据的功能,还可以做到从任意的位置开始回放或者读取数据,这点功能是非常强大的。

那么此时重新在生产端生产数据,比如4,5,6,再看消费端,如下图1-16所示,就可以看到有数据产生了。

 那么我想要读取1,2,3的数据,那该怎么办呢?此时只需要添加一个参数--from-beginning从最开始读取数据即可,如下图所示1-17:

 3、Kafka的数据消费的总结

kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据,如下图1-18所示。

 如下命令查看不同分区中产生的数据:

  1. 第一个消费者:
  2. [root@node02 kafka]$ bin/kafka-console-consumer.sh \
  3. --topic hadoop \
  4. --bootstrap-server node01:9092,node02:9092,node03:9092 \
  5. --partition 0 \
  6. --offset earliest
  7. 第二个消费者:
  8. [root@node02 kafka]$ bin/kafka-console-consumer.sh \
  9. --topic hadoop \
  10. --bootstrap-server node01:9092,node02:9092,node03:9092 \
  11. --partition 1 \
  12. --offset earliest
  13. 第三个消费者:
  14. [root@node02 kafka]$ bin/kafka-console-consumer.sh \
  15. --topic hadoop \
  16. --bootstrap-server node01:9092,node02:9092,node03:9092 \
  17. --partition 2 \
  18. --offset earliest

执行如下图1-19所示:

 offset是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。如果想要kafka中的数据全局有序,就需要把producer发送到一个分区中,如下图1-20所示。

 在组内,kafka的topic的partition个数,代表了kafka的topic的并行度,同一时间最多可以有多个线程来消费topic的数据,所以如果要想提高kafka的topic的消费能力,应该增大partition的个数。

(三)Kafka编程api

1、创建Kafka项目

指定项目存储位置和maven坐标,如下图1-21所示

 指定maven依赖信息:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka_2.11</artifactId>
  5. <version>1.1.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.kafka</groupId>
  9. <artifactId>kafka-clients</artifactId>
  10. <version>1.1.1</version>
  11. </dependency>
  12. </dependencies>

实际上kafka_2.11中已经包含了kafka-client,因此只导入上面的kafka_2.11也是可以的。

2、Kaka生产者的api操作

入口类:Producer

(1)入门案例:

  1. public class OrderProducer {
  2. public static void main(String[] args) {
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
  5. props.put("acks", "all");
  6. props.put("retries", 0);
  7. props.put("batch.size", 16384);
  8. props.put("linger.ms", 1);
  9. props.put("buffer.memory", 33554432);
  10. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
  13. for (int i = 0;i<100;i++){
  14. kafkaProducer.send(new ProducerRecord<String, String>("test","这是第"+i+"条数据"));
  15. }
  16. kafkaProducer.close();
  17. }
  18. }

(2)创建producer时需要指定的配置信息

  1. ## kafka的服务器
  2. bootstrap.servers=bd-offcn-01:9092,bd-offcn-02:9092,bd-offcn-03:9092
  3. ##Key的序列化器
  4. key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  5. ##value的序列化器
  6. value.serializer=org.apache.kafka.common.serialization.StringSerializer
  7. acks=[0|-1|1|all] ##消息确认机制
  8. 0: 不做确认,直管发送消息即可
  9. -1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
  10. 1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
  11. batch.size=1024 #每个分区内的用户缓存未发送record记录的空间大小
  12. ## 如果缓存区中的数据,没有占满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0
  13. linger.ms=10 ## 不管缓冲区是否被占满,延迟10ms发送request
  14. buffer.memory=10240 #控制的是一个producer中的所有的缓存空间
  15. retries=0 #发送消息失败之后的重试次数

 (3)修改配置查看生产数据情况

  1. bootstrap.servers=node01:9092,node02:9092,node03:9092
  2. # specify the compression codec for all data generated: none, gzip, snappy, lz4
  3. compression.type=none
  4. # name of the partitioner class for partitioning events; default partition spreads data randomly
  5. # 输入进入分区的方式
  6. #partitioner.class=
  7. # the maximum amount of time the client will wait for the response of a request
  8. # 请求超时时间
  9. #request.timeout.ms=
  10. # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
  11. # 使用send方法最大消息阻塞时间
  12. #max.block.ms=
  13. # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
  14. linger.ms=5000
  15. # the maximum size of a request in bytes
  16. ## 最大的请求大小
  17. #max.request.size=
  18. # the default batch size in bytes when batching multiple records sent to a partition
  19. batch.size=1024
  20. buffer.memory=10240
  21. key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
  22. value.serializer=org.apache.kafka.common.serialization.StringSerializer

生产数据代码:

  1. public class OrderProducer {
  2. public static void main(String[] args) {
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
  5. props.put("acks", "all");
  6. props.put("retries", 0);
  7. props.put("batch.size", 16384);
  8. props.put("linger.ms", 1);
  9. props.put("buffer.memory", 33554432);
  10. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
  13. for (int i = 0;i<100;i++){
  14. kafkaProducer.send(new ProducerRecord<String, String>("test","这是第"+i+"条数据"));
  15. }
  16. kafkaProducer.close();
  17. }
  18. }

3.Kaka消费者api

入口类:Consumer

配置文件:必要条件

  1. #1、地址
  2. bootstrap.servers=node01:9092
  3. #2、序列化
  4. key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
  5. #3、主题(topic) 需要制定具体的某个topic(order)即可。
  6. #4、消费者组 group.id=test
  7. public class OrderConsumer {
  8. public static void main(String[] args) {
  9. // 1\连接集群
  10. Properties props = new Properties();
  11. props.put("bootstrap.servers", "node01:9092");
  12. props.put("group.id", "test");
  13. //以下两行代码 ---消费者自动提交offset值
  14. props.put("enable.auto.commit", "true");
  15. props.put("auto.commit.interval.ms", "1000");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
  19. kafkaConsumer.subscribe(Arrays.asList("test"));
  20. while (true) {
  21. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  22. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  23. String value = consumerRecord.value();
  24. int partition = consumerRecord.partition();
  25. long offset = consumerRecord.offset();
  26. String key = consumerRecord.key();
  27. System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset);
  28. }
  29. }
  30. }
  31. }

4.指定分区数据进行消费

1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的分区的记录。

2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用YARN,Mesos或AWS工具等集群管理框架,或作为流处理框架的一部分)。在这种情况下,Kafka不需要检测故障并重新分配分区,因为消耗过程将在另一台机器上重新启动。

  1. public static void main(String[] args) {
  2. Properties props = new Properties();
  3. props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
  4. props.put("group.id", "test");
  5. props.put("enable.auto.commit", "true");
  6. props.put("auto.commit.interval.ms", "1000");
  7. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
  10. TopicPartition topicPartition = new TopicPartition("test", 0);
  11. TopicPartition topicPartition1 = new TopicPartition("test", 1);
  12. kafkaConsumer.assign(Arrays.asList(topicPartition, topicPartition1));
  13. while (true) {
  14. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
  15. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  16. String value = consumerRecord.value();
  17. int partition = consumerRecord.partition();
  18. long offset = consumerRecord.offset();
  19. String key = consumerRecord.key();
  20. System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset);
  21. }
  22. kafkaConsumer.commitSync();
  23. }
  24. } }

(四)Kafka和Flume整合 

flume主要是做日志数据(离线或实时)地采集。

 上图1-22,显示的是flume采集完毕数据之后,进行的离线处理和实时处理两条业务线,现在再来学习flume和kafka的整合处理。

配置flume.conf文件

  1. #为我们的source channel sink起名
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = k1
  5. #指定我们的source收集到的数据发送到哪个管道
  6. a1.sources.r1.channels = c1
  7. #指定我们的source数据收集策略
  8. a1.sources.r1.type = spooldir
  9. a1.sources.r1.spoolDir = /export/servers/flumedata
  10. a1.sources.r1.deletePolicy = never
  11. a1.sources.r1.fileSuffix = .COMPLETED
  12. a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
  13. a1.sources.r1.inputCharset = UTF-8
  14. #指定我们的channel为memory,即表示所有的数据都装进memory当中
  15. a1.channels.c1.type = memory
  16. #指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
  17. a1.sinks.k1.channel = c1
  18. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  19. a1.sinks.k1.kafka.topic = test
  20. a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  21. a1.sinks.k1.kafka.flumeBatchSize = 20
  22. a1.sinks.k1.kafka.producer.acks = 1

 启动flume和kafka的整合测试

  1. [offcn@bd-offcn-02 kafka]$ bin/kafka-console-consumer.sh \
  2. --topic test \
  3. --bootstrap-server node01:9092,node02:9092,node03:9092 \
  4. --from-beginning

消费者监听读取的数据

  • 启动flume-agent
[root@node01 flume]$ bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
  • 发送与接收数据验证

验证结果:

显示的发送与接收数据,可以说明flume和kafka的整合成功。

文章知识点与官方知识档案匹配,可进一步学习相关知识
云原生入门技能树首页概览13403 人正在系统学习中

与[转帖]Day63_Kafka(一)相似的内容:

[转帖]Day63_Kafka(一)

第一讲 Kafka基础操作 课程大纲 课程内容 学习效果 掌握目标 Kafka简介 消息队列 掌握 Kafka简介 Kafka分布式环境 Kafka操作 Kafka shell 掌握 Kafka api Flume整合kafka 一、Kafka简介 (一)消息队列 1、为甚要有消息队列 2、消息队列

[转帖]Day64_Kafka(二)

第二讲 Kafka架构 课程大纲 课程内容 学习效果 掌握目标 Kafka架构 Kafka就 掌握 Kafka ack Exactly once Kafka log Kafka log 掌握 Kafka log合并 Flume消息flush和Retention Kafka Leader Electi

[转帖]

Linux ubuntu20.04 网络配置(图文教程) 因为我是刚装好的最小系统,所以很多东西都没有,在开始配置之前需要做下准备 环境准备 系统:ubuntu20.04网卡:双网卡 网卡一:供连接互联网使用网卡二:供连接内网使用(看情况,如果一张网卡足够,没必要做第二张网卡) 工具: net-to

[转帖]

https://cloud.tencent.com/developer/article/2168105?areaSource=104001.13&traceId=zcVNsKTUApF9rNJSkcCbB 前言 Redis作为高性能的内存数据库,在大数据量的情况下也会遇到性能瓶颈,日常开发中只有时刻

[转帖]ISV 、OSV、 SIG 概念

ISV 、OSV、 SIG 概念 2022-10-14 12:29530原创大杂烩 本文链接:https://www.cndba.cn/dave/article/108699 1. ISV: Independent Software Vendors “独立软件开发商”,特指专门从事软件的开发、生产、

[转帖]Redis 7 参数 修改 说明

2022-06-16 14:491800原创Redis 本文链接:https://www.cndba.cn/dave/article/108066 在之前的博客我们介绍了Redis 7 的安装和配置,如下: Linux 7.8 平台 Redis 7 安装并配置开机自启动 操作手册https://ww

[转帖]HTTPS中间人攻击原理

https://www.zhihu.com/people/bei-ji-85/posts 背景 前一段时间,公司北京地区上线了一个HTTPS防火墙,用来监听HTTPS流量。防火墙上线之前,邮件通知给管理层,我从我老大那里听说这个事情的时候,说这个有风险,然后意外地发现,很多人原来都不知道HTTPS防

[转帖]关于字节序(大小端)的一点想法

https://www.zhihu.com/people/bei-ji-85/posts 今天在一个技术群里有人问起来了,当时有一些讨论(不完全都是我个人的观点),整理一下: 为什么网络字节序(多数情况下)是大端? 早年设备的缓存很小,先接收高字节能快速的判断报文信息:包长度(需要准备多大缓存)、地

[转帖]awk提取某一行某一列的数据

https://www.jianshu.com/p/dbcb7fe2da56 1、提取文件中第1列数据 awk '{print $1}' filename > out.txt 2、提取前2列的文件 awk `{print $1,$2}' filename > out.txt 3、打印完第一列,然后打

[转帖]awk 中 FS的用法

https://www.cnblogs.com/rohens-hbg/p/5510890.html 在openwrt文件 ar71xx.sh中 查询设备类型时,有这么一句, machine=$(awk 'BEGIN{FS="[ \t]+:[ \t]"} /machine/ {print $2}' /