使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?
执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).
在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )
完整的示例代码:
import confluent_kafka topics = ["<Your_topic_name>"] broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093" group_name = "<Consumer-group-name>" sasl_password = "<Connection-string>" # Create consumer. # This consumer will not join the group, but the group.id is required by # committed() to know which group to get offsets for. consumer = confluent_kafka.Consumer({'bootstrap.servers': broker, 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': '$ConnectionString', 'sasl.password': sasl_password, 'group.id': group_name}) print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag")) print("=" * 72) for topic in topics: # Get the topic's partitions metadata = consumer.list_topics(topic, timeout=10) if metadata.topics[topic].error is not None: raise confluent_kafka.KafkaException(metadata.topics[topic].error) # Construct TopicPartition list of partitions to query partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions] # Query committed offsets for this group and the given partitions committed = consumer.committed(partitions, timeout=10) for partition in committed: # Get the partitions low and high watermark offsets. (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False) if partition.offset == confluent_kafka.OFFSET_INVALID: offset = "-" else: offset = "%d" % (partition.offset) if hi < 0: lag = "no hwmark" # Unlikely elif partition.offset < 0: # No committed offset, show total message count as lag. # The actual message count may be lower due to compaction # and record deletions. lag = "%d" % (hi - lo) else: lag = "%d" % (hi - partition.offset) print("%-50s %9s %9s" % ( "{} [{}]".format(partition.topic, partition.partition), offset, lag)) consumer.close()