[转帖]一、Kafka Tool使用

kafka,tool,使用 · 浏览次数 : 0

小编点评

**1. 配置Kafka Tool** ``` # 设置集群名 cluster_name="cluster2" # 开启SASL认证 security_protocol="SASL_PLAINTEXT" # 设置SASL认证用户名和密码 sasl_plain_username="kafka" sasl_plain_password="123456" # 设置SASL机制为PLAIN sasl.mechanism="PLAIN" # 设置高级设置 highlighter_nixorg.apache.kafka.common.security.plain.PlainLoginModule required_username="kafka" password="123456" ``` **2. 修改生产者脚本** ```python # 创建Kafka生产者 highlighter_iniproducer = KafkaProducer( sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", sasl_plain_username=self.username, sasl_plain_password=self.password, bootstrap_servers=self.bootstrap_servers, value_serializer=lambda m: json.dumps(m).encode() ) ``` **3. 修改消费者脚本** ```python # 创建Kafka消费者 highlighter_routerosconsumer = KafkaConsumer( self.topic, sasl_mechanism="PLAIN", security_protocol="SASL_PLAINTEXT", sasl_plain_username=self.username, sasl_plain_password=self.password, bootstrap_servers=self.bootstrap_servers, consumer_timeout_ms=5000, group_id=group_id, auto_offset_reset=auto_offset_reset, enable_auto_commit=enable_auto_commit ) ``` **4. 添加排版代码** ```python # 打印数据读取完成的信息 log.warning("数据读取完成,数据读取超时,自动断开连接") # 返回数据列表 return lk_kafka ```

正文

一、Kafka Tool使用

1、添加cluster

2、开启SASL_PLAINTEXT

如果kafka 开启SASL_PLAINTEXT认证(用户名和密码认证)

3、高级设置

如果设置的是SASL Plaintext,则必须将sasl.mechanism客户端属性更改为PLAIN。可以在“高级”部分下的“ SASL机制”文本字段中输入此属性。

4、JAAS配置

highlighter- nix
org.apache.kafka.common.security.plain.PlainLoginModule required username = "kafka" password = "123456";

二、脚本修改

相应的脚本也需要对应的进行修改

1、生产者

highlighter- ini
producer = KafkaProducer(
            sasl_mechanism="PLAIN",
            security_protocol='SASL_PLAINTEXT',
            sasl_plain_username=self.username,
            sasl_plain_password=self.password,
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda m: json.dumps(m).encode())

2、消费者

highlighter- routeros
consumer = KafkaConsumer(self.topic,
                                 sasl_mechanism="PLAIN",
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_plain_username=self.username,
                                 sasl_plain_password=self.password,
                                 bootstrap_servers=self.bootstrap_servers,
                                 consumer_timeout_ms=5000,
                                 group_id=group_id,
                                 auto_offset_reset=auto_offset_reset,
                                 enable_auto_commit=enable_auto_commit
                                 )
        if consumer.bootstrap_connected():
            lk_kafka = []
            for message in consumer:
                msg = message.value.decode()
                lk_kafka.append(msg)
            log.warning("数据读取完成,数据读取超时,自动断开连接")
            return lk_kafka
        else:
            log.error("连接kafka失败,请确认连接信息是否正确")

与[转帖]一、Kafka Tool使用相似的内容:

[转帖]一、Kafka Tool使用

一、Kafka Tool使用 1、添加cluster 2、开启SASL_PLAINTEXT 如果kafka 开启SASL_PLAINTEXT认证(用户名和密码认证) 3、高级设置 如果设置的是SASL Plaintext,则必须将sasl.mechanism客户端属性更改为PLAIN。可以在“高级”

[转帖]Day63_Kafka(一)

第一讲 Kafka基础操作 课程大纲 课程内容 学习效果 掌握目标 Kafka简介 消息队列 掌握 Kafka简介 Kafka分布式环境 Kafka操作 Kafka shell 掌握 Kafka api Flume整合kafka 一、Kafka简介 (一)消息队列 1、为甚要有消息队列 2、消息队列

[转帖]Kafka 核心技术与实战学习笔记(七)kafka集群参数配置(上)

一.Broker 端参数 Broke存储信息配置 log.dirs:非常重要,指定Broker需要使用的若干文件目录路径,没有默认值必须亲自指定。log.dir:他只能表示单个路径,补充上一个参数用。 如何设置: 只要设置log.dirs,不要设置log.dir线上环境一定要为log.dirs配置多

[转帖]Kafka 核心技术与实战学习笔记(八)kafka集群参数配置(下)

一.Topic级别参数 Topic的优先级: 如果同时设置Topic级别参数和全局Broker参数,那么Topic级别优先 消息保存方面: retention.ms:规定Topic消息保存时长。默认是7天。一旦设置将覆盖掉Broker端的全局参数值。 retention.bytes:规定为该Topi

[转帖]Kafka 核心技术与实战学习笔记(六)kafka线上集群部署方案

一.操作系统-Linux Kafka是JVM系的大数据框架kafka由Scala语言和Java语言编写而成,编译之后的源代码就是普通的".class"文件 使用Linux kafka客户端底层使用Java的selector,selector在Linux上的实现机制是epoll,由于在windows上

[转帖]无需 zookeeper 安装 kafka 集群 (kakfa3.0 版本)

https://xie.infoq.cn/article/7769ef4576a165f7bdf142aa3 一、kafka 集群实例角色规划 在 kafka3.0 中已经可以将 zookeeper 去掉,使用 kraft 机制实现 controller 主控制器的选举。所以我们先简单了解下 kaf

[转帖]Kafka 性能优化与问题深究

Kafka 性能优化与问题深究 一.Kafka深入探究 1.1 kafka整体介绍 1. 1.1 Kafka 如何做到高吞吐、低延迟的呢? Kafka是一个分布式高吞吐量的消息系统,这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁

[转帖]无需 zookeeper 安装 kafka 集群 (kakfa3.0 版本)

https://xie.infoq.cn/article/7769ef4576a165f7bdf142aa3 一、kafka 集群实例角色规划 在 kafka3.0 中已经可以将 zookeeper 去掉,使用 kraft 机制实现 controller 主控制器的选举。所以我们先简单了解下 kaf

[转帖]awk高级企业级使用案例

https://www.jianshu.com/p/e36176ad3c06 一、背景: 以某物联网企业,传感器设备实时数据消费服务(Kafka-consumer)为例,调试筛选处理耗时的主题。 1. 原始日志格式(logback输出的): 2018-07-11 11:49:22.413 INFO

[转帖]消息队列与快递柜之间妙不可言的关系

https://xie.infoq.cn/article/8085241414e8959323ecd7811 一、消息队列是一个快递柜 我们来将快递柜与消息队列做一个对比 消息队列比作快递柜:有很多厂家生产快递柜,如:丰巢(apache kafka),速递易(alibaba RocketMQ),近邻