最近接手了一个项目,项目是使用Python开发的,其中使用到了Etcd,但是项目之前开发的方式,只能够支持单节点连接Etcd,不能够在Etcd节点发生故障时,自动转移。因此需要实现基于现有etcd sdk 开发一个能够实现故障转移的功能,或者更换etcd sdk来实现故障转移等功能。
先来看看项目之前使用到的 etcd 库,即 python-etcd3,通过给出的示例,没有看到可以连接多节点的方式,深入到源码后,也没有发现可以连接多节点的方式,基本上可以断定之前使用到的 etcd sdk 不支持集群方式了。因为项目中不仅仅是使用到了简单的 get、put、delete 等功能,还用到了 watch、lock等功能,所以最好是找到一个可以替换的 sdk,这样开发周期可以缩短,并且也可以减少工作量。
网上搜了下,发现用的比较多的几个库都不支持集群方式连接,而且也蛮久没有更新了。比如: etcd3-py、 python-etcd3。
那重新找一个 etcd 的sdk 吧,然后在 github 上面搜索,最开始按照默认推荐顺序看了好几个源代码,都是不支持集群方式连接的。
都有点心灰意冷了,突然想到可以换一下 github 的推荐顺序,换成最近有更新的,然后我换成了 Recently updated 搜索,然后从前往后看,在第二页看到了一个库,点击去看了下源代码,发现是通过 grpc 方式调用的 etcd server,点进去看 client.py 文件,看到有一个类是: MultiEndpointEtcd3Client
,突然眼前一亮,难道可以,然后更加文档安装了对于的 sdk ,测试发现可以集群连接。
发现可以集群连接后,接下来就是看看项目中用到的其他功能,可以正常使用不,比如: watch、lock 。测试发现都可以正常使用。
接下来就是集成到项目中了,这里就不仔细介绍,大家根据自己实际情况自行调整。
etcd-sdk-python 连接集群方式比较简单,需要先创建 Endpoint,然后作为参数,传给 MultiEndpointEtcd3Client。
from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError
# time_retry 的意思是,当这个节点连接失败后,多少秒后再次去尝试连接
e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=30)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=30)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=30)
# failover 的意思是,当节点发生故障时,是否进行故障转移,这个参数一定要设置为True,否则当一个节点发生故障时,会报错
c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)
l = c.lease(10)
data = {"data": 8000}
c.put("/test_ttl", json.dumps(data).encode("utf-8"), lease=l)
time.sleep(5)
b = c.get("/test_ttl")
print(dir(b))
print(dir(b[0]))
print(dir(b[1]))
print(b[1].lease_id)
import math
from threading import Thread
import time
from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError
e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)
c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)
class EtcdGlobalMutex(object):
def __init__(self, etcd_client, lock_key, ttl=5, acquire_timeout=2):
"""
:param etcd_client: 已连接的etcd客户端
:param lock_key: 分布式锁key
:param ttl: key的有效期
:param acquire_timeout: 尝试获取锁的最长等待时间
"""
self.etcd_client = etcd_client
self.lock_key = lock_key
self.ttl = ttl if ttl else 5
self.acquire_timeout = acquire_timeout if acquire_timeout else 2
self.locker = etcd_client.lock(lock_key, ttl)
def acquire(self):
self.locker.acquire(timeout=self.acquire_timeout)
def refresh_lock(self):
"""
刷新lock,本质上就是更新 key 的ttl
:return:
"""
# 向上取整
seconds = math.ceil(self.ttl / 2)
if seconds == 1 and self.ttl == 1:
seconds = 0.5
while True:
try:
self.locker.refresh()
except ConnectionFailedError as e:
# 测试发现,当etcd集群一个节点故障时,可能会出现这个错误
print(f"refresh_lock. lock_key:{self.lock_key}. ConnectionFailedError, err:{e}")
except Exception as e1:
# 非期望错误,退出,防止线程不能退出
print(f"refresh_lock. lock_key:{self.lock_key}. unexpected error. err:{e1}")
return
time.sleep(seconds)
def try_lock(self):
"""
尝试获取锁,当获取不到锁时,会监听对应的key,当key消失时,会再次尝试获取锁
:return:
"""
try:
self.acquire()
except ConnectionFailedError as e:
print(f"try_lock. lock_key:{self.lock_key}. ConnectionFailedError. err:{e}")
time.sleep(1)
self.try_lock()
if self.locker.is_acquired():
print(f"try_lock. lock_key:{self.lock_key}. Lock acquired successfully")
# 启动刷新锁的线程
t1 = Thread(target=self.refresh_lock)
t1.start()
else:
print(f"try_lock. lock_key:{self.lock_key}. Failed to acquire lock")
self._watch_key()
def _watch_key(self):
"""
监听 key
:return:
"""
# 写入etcd的key
real_key = f"/locks/{self.lock_key}"
cancel = None
try:
print(f"watch_key. lock_key:{self.lock_key}")
# watch 需要捕获异常,这样当一个etcd节点挂掉后,还能够正常 watch
events_iterator, cancel = self.etcd_client.watch(real_key)
for event in events_iterator:
print(f"watch_key. lock_key:{self.lock_key}. event: {event}")
cancel()
break
except ConnectionFailedError as e:
print(f"watch_key. lock_key:{self.lock_key}, ConnectionFailedError err:{e}")
if cancel:
cancel()
time.sleep(1)
self.etcd_client._clear_old_stubs()
self._watch_key()
self.try_lock()
def main():
name = 'lock_name'
e = EtcdGlobalMutex(c, name, ttl=10)
e.try_lock()
while True:
print("Main thread sleeping")
time.sleep(2)
if __name__ == "__main__":
main()
如果只是单纯的实现一个 watch key 功能,没啥好说的,看看官方给的 api 就可以,因为测试的时候,发现如果一个 etcd 节点挂掉,而这个节点有正好是连接的节点,会出现报错,这个时候需要做一些异常捕获处理。
import math
from threading import Thread
import time
from pyetcd import MultiEndpointEtcd3Client, Endpoint
from pyetcd.exceptions import ConnectionFailedError
from pyetcd.events import PutEvent
e1 = Endpoint(host="192.168.91.66", port=12379, secure=False, time_retry=2)
e2 = Endpoint(host="192.168.91.66", port=22379, secure=False, time_retry=2)
e3 = Endpoint(host="192.168.91.66", port=32379, secure=False, time_retry=2)
c = MultiEndpointEtcd3Client([e1, e2, e3], failover=True)
look_key = "look_key"
def watch(self):
print('MonitorEqp is watching')
cancel = None
try:
events_iterator, cancel = c.watch_prefix(look_key)
self.watch_key(events_iterator)
except ConnectionFailedError as e:
# 重点就是这里的异常处理
print(f"MonitorEqp. ConnectionFailedError, err:{e}")
if cancel:
cancel()
time.sleep(1)
c._clear_old_stubs()
watch()
except Exception as e1:
# 非期望错误,退出,防止线程不能退出
print(f"MonitorEqp. unexpected error. err:{e1}")
if cancel:
cancel()
return
def watch_key(self, events_iterator):
print("coming watch_key")
for watch_msg in events_iterator:
print(watch_msg)
if type(watch_msg) != PutEvent:
# 如果不是watch响应的Put信息, 忽略
continue
# xxx 处理监听到的信息
通过上面的学习,对 etcd-sdk-python 有一个基础的认识。
哈哈,再次感谢大佬贡献!