etcd MVCC 存储结构及流程

etcd,mvcc · 浏览次数 : 3

小编点评

**Function:** `tr.trace.Step()` **Purpose:** Log a step message to the trace. **Parameters:** * `message`: The message to log. **Return Value:** * `nil` if the step was successful, otherwise a `RangeResult` object. **Steps:** 1. Log a step message with a unique ID. 2. Get the range result for the given key. 3. If there are no revisions, return an empty `RangeResult`. 4. Set the limit to the `Limit` parameter (default is `len(revpairs)`). 5. Create a slice of `KeyValue` objects from the revisions. 6. Return a `RangeResult` object containing the following fields: * `KVs`: The `KeyValue` slice. * `Count`: The total number of revisions. * `Rev`: The current revision. **Example Usage:** ```go // Log a step message tr.trace.Step("count revisions from in-memory index tree") // Get the range result result := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) // Print the result fmt.Printf("Range result: %v\n", result) ``` **Output:** ``` Range result: &RangeResult{KVs: []mvccpb.KeyValue{ ... }, Count: 10, Rev: 5} ```

正文

什么是 MVCC

MVCC 是 Multi-Version Concurrency Control 的缩写,即多版本并发控制。它是一种并发控制的方法,用于在数据库系统中实现事务的隔离性。MVCC 是一种乐观锁机制,它通过保存数据的多个版本来实现事务的隔禽性。在 etcd 中,MVCC 是用于实现数据的版本控制的。而且可以查看历史版本的数据。

测试

# 添加数据
etcdctl put /test t1
OK
etcdctl put /test t2
OK
# 查看数据
etcdctl get /test
/test
t2
# 查看 json 格式数据
etcdctl get /test --write-out=json
# {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":15,"raft_term":4},"kvs":[{"key":"L3Rlc3Q=","create_revision":14,"mod_revision":15,"version":2,"value":"dDI="}],"count":1}
# 查看历史版本
etcdctl get /test --rev=14
/test
t1

可以看到,通过 --rev 参数可以查看历史版本的数据。也就是我第一次添加的数据。那么 json 中 revision 是什么意思呢?

revision

reversion 中是 etcd 中的一个概念,它是一个递增的整数,用于标识 etcd 中的数据版本。他是一个 int64 类型。没操作一次 etcd 数据(增,删,改),reversion 就会递增。

# 删除数据
etcdctl del /test
1
# 查看 revision
etcdctl get / -wjson
# {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":16,"raft_term":4}}
# 刚才是 15 现在是 16
# 添加 /test2 数据
etcdctl put /test2 t3
OK
# 查看 revision
etcdctl get / -wjson
# {"header":{"cluster_id":8735285696067307020,"member_id":7131777314758672153,"revision":17,"raft_term":4}}

存储结构

etcd mvcc 中,维护了两个数据结构,分别是 treeindex 和 boltDB。treeindex 是一个 B 树,用于存储 key 和 revision 之间的映射关系,它主要维护在内存中。而 boltDB 是一个 key-value 数据库,用于存储 key 和 value 之间的映射关系, 它主要维护在磁盘中, 用于持久化数据,虽然 boltdb 使用了 mmap 机制,但是它还是一个磁盘数据库。

treeindex

为什么 etcd 的 treeindex 使用 B-tree 而不使用哈希表、平衡二叉树?

因为 etcd 需要范围查询,所以哈希表不适合。而且etcd 中的 key 过多,平衡二叉树的查询效率不高,所以使用 B tree。

b-tree:

在 treeindex 中,数据的每个 key 是一个 keyIndex 结构,它保存了 key 和 revision 之间的映射关系。keyIndex 结构如下:

type keyIndex struct {
	key         []byte // key 的值
	modified    Revision // 最后一次修改的 main revision
	generations []generation // 保存了 key 的历史版本 没删除一次然后添加一次就是一个 generation
}

type Revision struct {
	// 就是 revision 的值,比如上边的 15 等
	Main int64
	// 子 revision 的值 主要是在事务中使用 比如事务中多个操作 那么就是 0 1 2 3 等
	Sub int64
}


// generation 保存了 key 的历史版本
type generation struct {
	ver     int64 // 版本号
	created Revision // 最后一次被创建的 revision
	revs    []Revision // 保存了 key 的历史 revision
}

在 treeindex 中,每个 keyIndex 保存了 key 的历史版本,而且每个 keyIndex 中的 generations 保存了 key 的历史版本。而且每个 generation 中的 revs 保存了 key 的历史 revision。这样就可以实现历史版本的查询。

获取 resersion 的值

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
	ti.RLock()
	defer ti.RUnlock()
	return ti.unsafeGet(key, atRev)
}

func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
	keyi := &keyIndex{key: key}
    // 从 B 树中获取 keyIndex
	if keyi = ti.keyIndex(keyi); keyi == nil {
		return Revision{}, Revision{}, 0, ErrRevisionNotFound
	}
    // 从 keyIndex 中获取 revision
	return keyi.get(ti.lg, atRev)
}

func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
	if ki, ok := ti.tree.Get(keyi); ok {
		return ki
	}
	return nil
}
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) {
	if ki.isEmpty() {
		lg.Panic(
			"'get' got an unexpected empty keyIndex",
			zap.String("key", string(ki.key)),
		)
	}
    // 找到 key 的 generation
	g := ki.findGeneration(atRev)
	if g.isEmpty() {
		return Revision{}, Revision{}, 0, ErrRevisionNotFound
	}

    // 从 generation 中获取 revision 找到第一次小于 atRev 的 revision
	n := g.walk(func(rev Revision) bool { return rev.Main > atRev })
	if n != -1 {
		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
	}

	return Revision{}, Revision{}, 0, ErrRevisionNotFound
}

// 基本的意思就是从后往前找到第一个 revision 小于 atRev 的 generation
func (ki *keyIndex) findGeneration(rev int64) *generation {
	lastg := len(ki.generations) - 1
	cg := lastg

	for cg >= 0 {
		if len(ki.generations[cg].revs) == 0 {
			cg--
			continue
		}
		g := ki.generations[cg]
		if cg != lastg {
            // 如果当前 generation 的最后一个 revision 小于等于 rev 那么就返回 nil
			if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev {
				return nil
			}
		}
		if g.revs[0].Main <= rev {
			return &ki.generations[cg]
		}
		cg--
	}
	return nil
}

// walk 从后往前遍历 generation
func (g *generation) walk(f func(rev Revision) bool) int {
	l := len(g.revs)
	for i := range g.revs {
		ok := f(g.revs[l-i-1])
		if !ok {
			return l - i - 1
		}
	}
	return -1
}

boltdb

上边的 treeindex 拿到 revision 之后,并没有拿到 value,那么如何拿到 value 呢?这就需要用到 boltdb 了。boltdb 是一个 key-value 数据库,用于存储 key 和 value 之间的映射关系。在 etcd 中,boltdb 主要用于持久化数据。
在 etcd 中,boltdb 报错的不是 etcd key-value 数据,而他的 ket 是 revision,value 是元数据。

func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
	rev := ro.Rev
    // 如果 rev 大于当前的 revision 那么就返回 ErrFutureRev
	if rev > curRev {
		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
	}
    // 如果 rev 小于等于 0 那么就是当前的 revision
	if rev <= 0 {
		rev = curRev
	}
    // 如果 rev 小于 compactMainRev 那么就返回 ErrCompacted
	if rev < tr.s.compactMainRev {
		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
	}
    // 如果 re.Count 代表 count 操作 查出来直接返回数量就可以了 不需要在查 value
	if ro.Count {
		total := tr.s.kvindex.CountRevisions(key, end, rev)
		tr.trace.Step("count revisions from in-memory index tree")
		return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
	}
    // 查好需要的 revision 之后,从 boltdb 中查出 value ,revpairs 是从 treeindex 中查出来的 revisions
	revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
	tr.trace.Step("range keys from in-memory index tree")
	if len(revpairs) == 0 {
		return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
	}

	limit := int(ro.Limit)
	if limit <= 0 || limit > len(revpairs) {
		limit = len(revpairs)
	}

	kvs := make([]mvccpb.KeyValue, limit)
	revBytes := NewRevBytes()
    // 对于每个 revision 从 boltdb 中查出 value
	for i, revpair := range revpairs[:len(kvs)] {
		select {
		case <-ctx.Done():
			return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
		default:
		}
        // 把 revision 转换成 bytes
		revBytes = RevToBytes(revpair, revBytes)
        // 从 boltdb 中查出 value
		_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
		if len(vs) != 1 {
			tr.s.lg.Fatal(
				"range failed to find revision pair",
				zap.Int64("revision-main", revpair.Main),
				zap.Int64("revision-sub", revpair.Sub),
				zap.Int64("revision-current", curRev),
				zap.Int64("range-option-rev", ro.Rev),
				zap.Int64("range-option-limit", ro.Limit),
				zap.Binary("key", key),
				zap.Binary("end", end),
				zap.Int("len-revpairs", len(revpairs)),
				zap.Int("len-values", len(vs)),
			)
		}
        // 把 value 转换成 mvccpb.KeyValue
		if err := kvs[i].Unmarshal(vs[0]); err != nil {
			tr.s.lg.Fatal(
				"failed to unmarshal mvccpb.KeyValue",
				zap.Error(err),
			)
		}
	}
	tr.trace.Step("range keys from bolt db")
	return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
}

// boltdb 的 key 结构 
type BucketKey struct {
	Revision
    // 墓碑标志 当删除的时候 先标记一下
	tombstone bool
}
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
	if endKey == nil {
		// forbid duplicates for single keys
		limit = 1
	}
	if limit <= 0 {
		limit = math.MaxInt64
	}
	if limit > 1 && !bucketType.IsSafeRangeBucket() {
		panic("do not use unsafeRange on non-keys bucket")
	}
    // 从缓存中拿出数据
	keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
	if int64(len(keys)) == limit {
		return keys, vals
	}

	// find/cache bucket
	bn := bucketType.ID()
	baseReadTx.txMu.RLock()
	bucket, ok := baseReadTx.buckets[bn]
	baseReadTx.txMu.RUnlock()
	lockHeld := false
	if !ok {
		baseReadTx.txMu.Lock()
		lockHeld = true
		bucket = baseReadTx.tx.Bucket(bucketType.Name())
		baseReadTx.buckets[bn] = bucket
	}

	// ignore missing bucket since may have been created in this batch
	if bucket == nil {
		if lockHeld {
			baseReadTx.txMu.Unlock()
		}
		return keys, vals
	}
	if !lockHeld {
		baseReadTx.txMu.Lock()
	}
	c := bucket.Cursor()
	baseReadTx.txMu.Unlock()
    // 从 boltdb 中查出数据
	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
    return append(k2, keys...), append(v2, vals...)
}

流程

  1. 用户通过 etcdctl get /b 命令获取数据
  2. etcd 通过 treeindex 获取 key 的 revision 信息 {man: 19, sub: 0}
  3. etcd 通过 key = {man: 19, sub: 0, tombstone: false} 从 boltdb 中获取 value 值 他是一个protobuf 序列化的数据
  4. etcd 将 value 值反序列化成 mvccpb.KeyValue
  5. etcd 将 mvccpb.KeyValue 返回给用户

与etcd MVCC 存储结构及流程相似的内容:

etcd MVCC 存储结构及流程

什么是 MVCC MVCC 是 Multi-Version Concurrency Control 的缩写,即多版本并发控制。它是一种并发控制的方法,用于在数据库系统中实现事务的隔离性。MVCC 是一种乐观锁机制,它通过保存数据的多个版本来实现事务的隔禽性。在 etcd 中,MVCC 是用于实现数据

[转帖]etcd的安装教程

Linux 系统中,下载最新版本的ETCD Releases · etcd-io/etcd · GitHub 一.下载方式 ETCD_VER=v3.5.3 # choose either URLGOOGLE_URL=https://storage.googleapis.com/etcdGITHUB_

[转帖]Etcd+Confd实现Nginx配置文件自动管理

https://www.cnblogs.com/zhengchunyuan/p/9681954.html 一、需求 我们使用Nginx做七层负载均衡,后端是Tomcat。项目采用灰度发布方式,每次项目升级,都要手动先从Nginx下摘掉一组,然后再升级这组,当项目快速迭代时,手动做这些操作显然会增加部

[转帖]etcd的备份与恢复

https://www.cnblogs.com/wyh-l6/p/16547040.html etcd是coreos团队在2013年6月发起的开源项目,现在在githab上托管 etcd目标构建一个高可用的分布式键值数据库 etcd具有以下属性: 完全复制:集群中的每个节点都可以使用完整的存档 高可

[转帖]etcd网络模块解析

https://www.cnblogs.com/luohaixian/p/17509742.html 1. RaftHttp模块介绍 在etcd里raft模块和网络模块是分开的,raft模块主要负责实现共识算法,保证集群节点的一致性,消息的发送和接收则交给raftHttp网络模块来处理,由上层应用模

[转帖]etcd raft模块解析

https://www.cnblogs.com/luohaixian/p/16641100.html 1. Raft简介 raft是一个管理复制式日志的共识算法,它是通过复制日志的方式来保持状态机里的数据是最终一致的。 整体的一个运行描述图: 从图中可以看到由几部分组成,共识模块、日志模块和状态机。

etcd:增加30%的写入性能

etcd:增加30%的写入性能 本文最终的解决方式很简单,就是将现有卷升级为支持更高IOPS的卷,但解决问题的过程值得推荐。 译自:etcd: getting 30% more write/s 我们的团队看管着大约30套自建的Kubernetes集群,最近需要针对etcd集群进行性能分析。 每个et

Python连接Etcd集群基础教程

1、背景介绍 最近接手了一个项目,项目是使用Python开发的,其中使用到了Etcd,但是项目之前开发的方式,只能够支持单节点连接Etcd,不能够在Etcd节点发生故障时,自动转移。因此需要实现基于现有etcd sdk 开发一个能够实现故障转移的功能,或者更换etcd sdk来实现故障转移等功能。

[转帖]突破 etcd 限制!字节自研 K8s 存储 KubeBrain

https://my.oschina.net/u/5632822/blog/5596911 KubeBrain 是字节跳动针对 Kubernetes 元信息存储的使用需求,基于分布式 KV 存储引擎设计并实现的、可以取代 etcd 的元信息存储系统,目前支撑着线上超过 20,000 节点的超大规模

5种GaussDB ETCD服务异常实例分析处理

摘要:一文带你细数几种ETCD服务异常实例状态。 本文分享自华为云社区《【实例状态】GaussDB ETCD服务异常》,作者:酷哥 。 首先确认是否是虚拟机、网络故障 虚拟机故障导致ETCD服务异常告警 问题现象 管控面上报etcd服务异常告警,虚拟机发生重启,热迁移、冷迁移,HA等动作。 问题分析