Python连接Etcd集群基础教程

python,etcd · 浏览次数 : 0

小编点评

本文主要介绍了在Python项目中使用etcd作为服务发现和配置存储的过程中,如何实现故障转移和多节点连接的问题。作者通过搜索和对比不同的etcd SDK,找到了一个支持集群连接的SDK,并将其集成到项目中。同时,作者还实现了一个简约的自动续约的分布式锁功能,以提高项目的可靠性和稳定性。 1. **项目背景**: - 项目需求:实现故障转移功能,以便在etcd节点故障时自动切换。 - 现有工具:使用的是python-etcd3库,但不支持集群连接。 - 问题分析:现有的etcd SDK不支持多节点连接,需要寻找替代方案。 2. **寻找可替换的SDK**: - 搜寻过程:在网上搜索可替换的etcd SDK,但没有找到直接支持集群的选项。 - 优化搜索:通过调整github的推荐顺序,找到了一个支持集群连接的库。 - 安装与测试:安装并测试新找到的SDK,确认可以正常连接etcd集群。 3. **etcd-sdk-python连接集群**: - 官方教程:提供了简单的教程,说明如何创建Endpoint并连接到etcd集群。 - 示例代码:提供了一个示例代码,展示了如何使用MultiEndpointEtcd3Client连接集群。 4. **实现一个简约的自动续约的分布式锁**: - 功能介绍:介绍了一个自动续约的分布式锁功能,用于提高项目的可靠性。 - 实现细节:详细描述了如何实现锁的自动续约和重试机制。 5. **watch key如何实现?**: - 异常处理:讨论了如何在etcd客户端出现连接失败时,正确处理异常。 - 事件监听:解释了如何监听etcd key的变化,并在节点故障时进行处理。 总的来说,本文通过寻找合适的etcd SDK和实现分布式锁功能,解决了项目中遇到的故障转移和多节点连接问题,提高了项目的稳定性和可靠性。

正文

1、背景介绍

最近接手了一个项目,项目是使用Python开发的,其中使用到了Etcd,但是项目之前开发的方式,只能够支持单节点连接Etcd,不能够在Etcd节点发生故障时,自动转移。因此需要实现基于现有etcd sdk 开发一个能够实现故障转移的功能,或者更换etcd sdk来实现故障转移等功能。

先来看看项目之前使用到的 etcd 库,即 python-etcd3,通过给出的示例,没有看到可以连接多节点的方式,深入到源码后,也没有发现可以连接多节点的方式,基本上可以断定之前使用到的 etcd sdk 不支持集群方式了。因为项目中不仅仅是使用到了简单的 get、put、delete 等功能,还用到了 watch、lock等功能,所以最好是找到一个可以替换的 sdk,这样开发周期可以缩短,并且也可以减少工作量。

2、寻找可替换的SDK

网上搜了下,发现用的比较多的几个库都不支持集群方式连接,而且也蛮久没有更新了。比如: etcd3-pypython-etcd3

那重新找一个 etcd 的sdk 吧,然后在 github 上面搜索,最开始按照默认推荐顺序看了好几个源代码,都是不支持集群方式连接的。

都有点心灰意冷了,突然想到可以换一下 github 的推荐顺序,换成最近有更新的,然后我换成了 Recently updated 搜索,然后从前往后看,在第二页看到了一个库,点击去看了下源代码,发现是通过 grpc 方式调用的 etcd server,点进去看 client.py 文件,看到有一个类是: MultiEndpointEtcd3Client,突然眼前一亮,难道可以,然后更加文档安装了对于的 sdk ,测试发现可以集群连接。

发现可以集群连接后,接下来就是看看项目中用到的其他功能,可以正常使用不,比如: watch、lock 。测试发现都可以正常使用。

接下来就是集成到项目中了,这里就不仔细介绍,大家根据自己实际情况自行调整。

3、etcd-sdk-python 连接集群

官方教程

etcd-sdk-python 连接集群方式比较简单,需要先创建 Endpoint,然后作为参数,传给 MultiEndpointEtcd3Client。

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError

# time_retry 的意思是,当这个节点连接失败后,多少秒后再次去尝试连接
e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=30)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=30)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=30)

# failover 的意思是,当节点发生故障时,是否进行故障转移,这个参数一定要设置为True,否则当一个节点发生故障时,会报错
c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)

l = c.lease(10)

data = {"data": 8000}
c.put("/test_ttl", json.dumps(data).encode("utf-8"), lease=l)

time.sleep(5)
b = c.get("/test_ttl")
print(dir(b))

print(dir(b[0]))
print(dir(b[1]))
print(b[1].lease_id)

4、实现一个简约的自动续约的分布式锁

import math
from threading import Thread
import time

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError

e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)

c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)


class EtcdGlobalMutex(object):

    def __init__(self, etcd_client, lock_key, ttl=5, acquire_timeout=2):
        """

        :param etcd_client: 已连接的etcd客户端
        :param lock_key: 分布式锁key
        :param ttl: key的有效期
        :param acquire_timeout: 尝试获取锁的最长等待时间
        """
        self.etcd_client = etcd_client
        self.lock_key = lock_key
        self.ttl = ttl if ttl else 5
        self.acquire_timeout = acquire_timeout if acquire_timeout else 2
        self.locker = etcd_client.lock(lock_key, ttl)

    def acquire(self):
        self.locker.acquire(timeout=self.acquire_timeout)

    def refresh_lock(self):
        """
        刷新lock,本质上就是更新 key 的ttl
        :return:
        """
        # 向上取整
        seconds = math.ceil(self.ttl / 2)
        if seconds == 1 and self.ttl == 1:
            seconds = 0.5

        while True:
            try:
                self.locker.refresh()
            except ConnectionFailedError as e:
                # 测试发现,当etcd集群一个节点故障时,可能会出现这个错误
                print(f"refresh_lock. lock_key:{self.lock_key}. ConnectionFailedError, err:{e}")

            except Exception as e1:
                # 非期望错误,退出,防止线程不能退出
                print(f"refresh_lock. lock_key:{self.lock_key}. unexpected error. err:{e1}")
                return

            time.sleep(seconds)

    def try_lock(self):
        """
        尝试获取锁,当获取不到锁时,会监听对应的key,当key消失时,会再次尝试获取锁
        :return:
        """
        try:
            self.acquire()
        except ConnectionFailedError as e:
            print(f"try_lock. lock_key:{self.lock_key}. ConnectionFailedError. err:{e}")
            time.sleep(1)

            self.try_lock()

        if self.locker.is_acquired():
            print(f"try_lock. lock_key:{self.lock_key}. Lock acquired successfully")
            # 启动刷新锁的线程
            t1 = Thread(target=self.refresh_lock)
            t1.start()
        else:
            print(f"try_lock. lock_key:{self.lock_key}. Failed to acquire lock")
            self._watch_key()

    def _watch_key(self):
        """
        监听 key
        :return: 
        """
        # 写入etcd的key
        real_key = f"/locks/{self.lock_key}"
        cancel = None
        try:
            print(f"watch_key. lock_key:{self.lock_key}")
            # watch 需要捕获异常,这样当一个etcd节点挂掉后,还能够正常 watch
            events_iterator, cancel = self.etcd_client.watch(real_key)
            for event in events_iterator:
                print(f"watch_key. lock_key:{self.lock_key}. event: {event}")
                cancel()
                break
        except ConnectionFailedError as e:
            print(f"watch_key. lock_key:{self.lock_key}, ConnectionFailedError err:{e}")
            if cancel:
                cancel()
            time.sleep(1)
            
            self.etcd_client._clear_old_stubs()

            self._watch_key()

        self.try_lock()


def main():
    name = 'lock_name'
    e = EtcdGlobalMutex(c, name, ttl=10)
    e.try_lock()

    while True:
        print("Main thread sleeping")
        time.sleep(2)


if __name__ == "__main__":
    main()


5、watch key 如何实现?

如果只是单纯的实现一个 watch key 功能,没啥好说的,看看官方给的 api 就可以,因为测试的时候,发现如果一个 etcd 节点挂掉,而这个节点有正好是连接的节点,会出现报错,这个时候需要做一些异常捕获处理。

import math
from threading import Thread
import time

from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError
from pyetcd.events import PutEvent

e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)

c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)

look_key = "look_key"

def watch(self):
    print('MonitorEqp is watching')
    cancel = None
    try:
        events_iterator, cancel = c.watch_prefix(look_key)

        self.watch_key(events_iterator)
    except  ConnectionFailedError as e:
        # 重点就是这里的异常处理
        print(f"MonitorEqp. ConnectionFailedError, err:{e}")
        if cancel:
            cancel()
        time.sleep(1)

        c._clear_old_stubs()
        watch()
    except Exception as e1:
        # 非期望错误,退出,防止线程不能退出
        print(f"MonitorEqp.  unexpected error. err:{e1}")
        if cancel:
            cancel()

        return


def watch_key(self, events_iterator):
    print("coming watch_key")
    for watch_msg in events_iterator:
        print(watch_msg)
        if type(watch_msg) != PutEvent:
            # 如果不是watch响应的Put信息, 忽略
            continue

        # xxx 处理监听到的信息

通过上面的学习,对 etcd-sdk-python 有一个基础的认识。

哈哈,再次感谢大佬贡献!

与Python连接Etcd集群基础教程相似的内容:

Python连接Etcd集群基础教程

1、背景介绍 最近接手了一个项目,项目是使用Python开发的,其中使用到了Etcd,但是项目之前开发的方式,只能够支持单节点连接Etcd,不能够在Etcd节点发生故障时,自动转移。因此需要实现基于现有etcd sdk 开发一个能够实现故障转移的功能,或者更换etcd sdk来实现故障转移等功能。

[转帖]Python连接Oracle数据库进行数据处理操作

https://www.dgrt.cn/a/2259443.html?action=onClick 解决以下问题: Python连接Oracle数据库,并查询、提取Oracle数据库中数据? 通过Python在Oracle数据库中创建表 Python数据插入到Oracle数据库中? Python删除

python | 连接数据库

介绍一些python中用于连接常用数据库的依赖库。 SQLite3 SQLite3是Python 中自带的数据库模块,适用于小型应用和快速原型开发。 SQLite是一个进程内的库,实现了自给自足的、无服务器的、是非常小的,是轻量级的、事务性的 SQL 数据库引擎。它是一个零配置的数据库,不需要在系统

Python连接Neo4j工具比较 Neo4j Driver、py2neo

py2neo 目前不支持 neo4j 5.X,Neo4j Driver for Python是官方提供的驱动程序,提供了与Neo4j数据库进行通信的基本功能,如果你更倾向于底层的控制,或者你的项目对性能要求较高。而py2neo则提供了更多的功能和便利性,以简化与Neo4j数据库的交互,更高级的抽象和便利性,以及一些附加的功能。选择哪个库取决于您的具体需求和偏好。

[转帖]如何用python连接Linux服务器

1.安装paramiko库 pip install paramiko 2.使用paramiko库连接linux #导入库 import paramiko #创建一个sshclient对象 ssh = paramiko.SSHClient() #允许连接不在know_host中的主机 ssh.set_

【转帖】【笔记】python连接神通数据库

https://www.cnblogs.com/wyongbo/p/17054924.html python连接国产神州通用数据库。 一、准备 下载whl及dll: 链接: https://pan.baidu.com/s/1lwE-FwIsf-aYjoqCPij2hA 提取码: 49qp 二、安装

【Azure Cache for Redis】Python Django-Redis连接Azure Redis服务遇上(104, 'Connection reset by peer')

问题描述 使用Python连接Azure Redis服务,因为在代码中使用的是Django-redis组件,所以通过如下的配置连接到Azure Redis服务: CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", "

Python学习之十九_程序运行时间的验证

# Python学习之十九_程序运行时间的验证 ## 背景 ``` 最近一段时间比较忙. 而且还遇到了一个lua脚本优化redis访问的场景. 想着自己还在学习python(时断时续) 所以想借着这个场景,学习一下python连接redis,以及验证lua脚本和原生redis命令的效率问题. 虽然方

Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!

本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,El

Python学习之十_paramiko的简单学习

Python学习之十_paramiko的简单学习 简介 pywinrm 是python用于连接访问windows的工具 paramiko 是python用于连接访问linux的工具 ansible等工具很多也是基于类似的组件进行的处理 连接不同的系统进行命令行的操作. paramiko的简介 par