【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

相关,lag,Confluent,方法 · 浏览次数 : 29

小编点评

**SSL 错误可能导致 Confluent SDK 获取 offset 或 lag 时出现 SSL 错误,因为 SASL 连接字符串中的 sasl_password 值可能设置错误。** **解决方案:** 1. **检查 SASL_SSL 连接字符串的正确格式:** - SASL_SSL 连接字符串应该以 "plain" 或 "ssl" 开头,并包含有效的凭据。 2. **修改 connection_string 设置中的 sasl_password 值:** - 请确保 sasl_password 设置正确,并检查其格式。 **修改后的示例代码:** ```python import confluent_kafkatopics = [ # Your Kafka topics ] broker = "your-eventhub-namespace-name.servicebus.chinacloudapi.cn:9093" group_name = "your-consumer-group-name" sasl_password = "your-connection-string" # Create consumer consumer = confluent_kafka.Consumer( bootstrap_servers=broker, security_protocol="SASL_SSL", sasl_mechanism="PLAIN", sasl_username="$ConnectionString", sasl_password=sasl_password, group_id=group_name, ) print( "%-50s %9s %9s" % ( "Topic [Partition]\", "Committed", "Lag", ) ) # Get Kafka topics for topic in topics: # Get topic's partitions metadata = consumer.list_topics(topic, timeout=10) # Handle errors if metadata.topics[topic].error is not None: raise confluent_kafka.KafkaException(metadata.topics[topic].error) # Construct TopicPartition list partitions = [ confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions ] # Query committed offsets for this group and the given partitions committed = consumer.committed(partitions, timeout=10) # For each partition, get its low and high watermark offsets for partition in committed: # ... ``` **注意:** - 请确保 `topics` 列表中包含正确的 Kafka 主题名称。 - 请确保 `sasl_password` 设置正确,并与您使用的 SASL 凭据匹配。 - 此代码示例仅供参考,请根据您的实际情况进行修改。

正文

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

 

 完整的示例代码:

import confluent_kafka

topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})

print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)

for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)

    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)

    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)

        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)

        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))


consumer.close()

 

参考文档


confluent-kafka-python : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py
 

与【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误相似的内容:

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述 使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢? 问题解决 执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,

【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls

问题描述 参考Github上 Event Hub的示例代码(Using Apache Flink with Event Hubs for Apache Kafka Ecosystems : https://github.com/Azure/azure-event-hubs-for-kafka/tre

【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能

问题描述 Azure Event Hub支持 kafka,所以为了测试消息生产者所在环境与Azure Event Hub之间发送消息的性能如何,特别使用 kafka 官方测试生产者,消费者的性能工具 : kafka-producer-perf-test.bat kafka-consumer-perf

【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存

问题描述 在使用Azure Event Hub的SDK时候,常规情况下,发现示例代码中并没有SDK内部的日志输出。因为在Java项目中,没有添加 SLF4J 依赖,已致于在启动时候有如下提示: SLF4J: Failed to load class "org.slf4j.impl.StaticLog

【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out

问题描述 在使用Java 代码向 Azure Event Hub发送数据时,先后遇见了如下两种异常消息: 1)ERROR c.t.d.h.s.source.EventHubLogConsumer - Error occurred in partition processor for partitio

【Azure 事件中心】Event Hubs如何获取其中存放的历史消息

问题描述 使用Azure Event Hub服务,除了正常的生产,消费消息以外,如果想拿到Event Hub中存储的历史消息?有什么方法呢? 问题解答 获取 Event Hubs 存储的历史消息,首先需要确保消息进入Event Hub的时间处于保留期限(Retention Days)内,因为超过这个

Axure 变量、属性、函数

局部变量 使用场景非常多; 需要先创建; 只能作用于当前事件; 命名需要注意,只能英文+数字; 全局变量 需要先创建; 可以作用于整个文档,在任意页面调用或使用 中继器的 Item 属性 item:获取数据集一行数据的集合,即数据行的对象。 ltem:列名:获取数据行中指定列的值。 index:获取

【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息

2022-11-03 10:58:21.474 INFO --- [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer []: Load balancer already running 2022-11-03 10:58:51.014 WARN --- [ parallel-2] c.a.m.e.Partition

【Azure 事件中心】Azure Event Hub中的数据能不能存储大于7天呢?如果7天之后是不是会自动删除呢?

问题描述 Event Hub中有个retention的设置为7天,有没有办法增大这个Retention的时间? 如果没办法,是不是超过7天的数据就会被删除? 问题解答 因为Azure Event Hub(事件中心)是一个实时事件流引擎,其设计意图并不是用于代替数据库以及/或者用作无限期保存的事件流的

【Azure 事件中心】Event Hubs中存在非常多的错误数据,是否能提前删除这些数据呢?

问题描述 因为一些特殊原因,Event Hub 里面堆积了很多不需要的数据事件,正常要等事件中的过期时间到后才有Event Hub自动删除掉,但希望能够尽快马上删除,有没有什么手动的方法吗? 问题解答 Event Hub是一个数据事件处理服务,最主要的功能就是:接收和发送事件。它并不是一个数据存储服