使用shuffle sharding增加容错性

容错性,sharding · 浏览次数 : 0

小编点评

```go func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int { // ... ... // 获取本条流的队列列表 hand := qs.dealer.DealIntoHand(hashValue, backHand[:]) // ... ... // 返回最佳队列索引 return bestQueueIdx } //归纳总结以上内容,生成内容时需要带html排版标签 func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int { // ... ... // 获取本条流的队列列表 hand := qs.dealer.DealIntoHand(hashValue, backHand[:]) // ... ... // 返回最佳队列索引 return bestQueueIdx } ``` ```go // ... ... func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int { // ... ... // 获取本条流的队列列表 hand := qs.dealer.DealIntoHand(hashValue, backHand[:]) // ... ... // 返回最佳队列索引 return bestQueueIdx } //归纳总结以上内容,生成内容时需要带html排版标签 func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int { // ... ... // 获取本条流的队列列表 hand := qs.dealer.DealIntoHand(hashValue, backHand[:]) // ... ... // 返回最佳队列索引 return bestQueueIdx } ```

正文

使用shuffle sharding增加容错性

最近在看kubernetes的API Priority and Fairness,它使用shuffle sharding来为请求选择处理队列,以此防止高吞吐量流挤占低吞吐量流,进而造成请求延迟的问题。

介绍

首先看下什么是shuffle sharding,下面内容来自aws的Workload isolation using shuffle-sharding

首先来看如何使用一般分片方式来让系统具备可扩展性和弹性。

假设有一个8 workers节点的水平可扩展的系统或服务,下图红线表示达到这些节点的请求,worker可以是服务,队列或数据库等。

image

如果没有任何分片,则要求每个worker能够处理所有请求。这种方式高效且具备一定的冗余性。如果一个worker出现故障,则可以将它的任务分配到剩余的7个worker上。此时可能需要增加一定的系统容量。但如果突然出现大量请求,如DDoS攻击,可能会导致级联故障。下面两张图展示了故障是如何升级的。

image

首先会影响第一台worker,随后会级联到其他workers上,最终导致整个服务不可用。

image

为了防止故障转移,通常可以使用分片方式,如将workers分为4个分片,以效率换取影响度。下面两张图展示了如何使用分片来限制DDoS攻击。

image

本例中,每个分片包含2个workers,并按照资源(如域名)进行切片。此时的系统仍然具有冗余性,但由于每个分片只有2个workers,因此可能需要增加容量来避免故障。

image

通过这种方式降低了故障影响范围。这里有4个分片,如果一个分片故障,则只会影响该分片上的服务,其他分片则不受影响。影响范围为25%。使用shuffle sharding可以达到更好的效果。

shuffle sharding用到了虚拟分片(shuffle shard)的概念,这里将不会直接对workers进行分片,而是按照"用户"进行分片,目的是尽量将用户打散分布到不同的worker上。

下图展示的shuffle sharding布局中包含8个workers和8个客户,并给每个客户分配了2个workers。以彩虹和玫瑰表示的客户为例。

这里,我们给彩虹客户分配了第1个和第4个worker,这两个workers构成了该客户的shuffle shard,其他客户将使用不同的虚拟分片(含2个workers),如玫瑰客户分配了第1个和最后一个worker。

image

如果彩虹用户分配的worker 1和worker 4出现了问题(如恶意请求或请求泛红等),则此问题只会影响本虚拟分片,但不会影响到其他shuffle shard。事实上,最多只会有另外一个shuffle shard会受到影响(即另外一个服务都部署到了worker 1和worker 4)。如果请求方具有容错性,则可以继续使用剩余分片继续提供服务。

image

换句话说,当彩虹客户所在的节点因为出现问题或受到攻击而无法提供服务时,不会影响到其他节点。对于客户而言,虽然玫瑰客户和向日葵客户都和彩虹客户共享了worker,但并没有导致其服务中断,玫瑰客户仍然可以继续使用workers 8提供服务,而向日葵客户可以继续使用worker 6提供服务。

image

当出现上述问题时,虽然失去了四分之一的worker节点,但使用shuffle sharding可以大大降低影响范围。上述场景下,一共有28种两两worker的组合方式,即28种shuffle shards。当有上百甚至更多的客户时,我们可以给每个客户分配一个shuffle shards,以此可以将影响范围缩小到1/28,效果是一般分片方式的7倍。

kubernetes中的shuffle sharding

使用shuffle sharding为流分片队列

kubernetes的流控功能中使用了shuffle sharding,其代码实现如下:

func NewDealer(deckSize, handSize int) (*Dealer, error) {
	if deckSize <= 0 || handSize <= 0 {
		return nil, fmt.Errorf("deckSize %d or handSize %d is not positive", deckSize, handSize)
	}
	if handSize > deckSize {
		return nil, fmt.Errorf("handSize %d is greater than deckSize %d", handSize, deckSize)
	}
	if deckSize > 1<<26 {
		return nil, fmt.Errorf("deckSize %d is impractically large", deckSize)
	}
	if RequiredEntropyBits(deckSize, handSize) > MaxHashBits {
		return nil, fmt.Errorf("required entropy bits of deckSize %d and handSize %d is greater than %d", deckSize, handSize, MaxHashBits)
	}

	return &Dealer{
		deckSize: deckSize,
		handSize: handSize,
	}, nil
}

func (d *Dealer) Deal(hashValue uint64, pick func(int)) {
	// 15 is the largest possible value of handSize
	var remainders [15]int

  //这个for循环用于生成[0,deckSize)范围内的随机数。
	for i := 0; i < d.handSize; i++ {
		hashValueNext := hashValue / uint64(d.deckSize-i)
		remainders[i] = int(hashValue - uint64(d.deckSize-i)*hashValueNext)
		hashValue = hashValueNext
	}

	for i := 0; i < d.handSize; i++ {
		card := remainders[i]
		for j := i; j > 0; j-- {
			if card >= remainders[j-1] {
				card++
			}
		}
		pick(card)
	}
}

func (d *Dealer) DealIntoHand(hashValue uint64, hand []int) []int {
	h := hand[:0]
	d.Deal(hashValue, func(card int) { h = append(h, card) })
	return h
}
  1. 首先使用func NewDealer(deckSize, handSize int)初始化一个实例,以kubernetes的APF功能为例,deckSize为队列数,handSize表示为一条流分配的队列数量

  2. 使用func (d *Dealer) DealIntoHand(hashValue uint64, hand []int)可以返回为流选择的队列ID,hashValue可以看做是流的唯一标识,hand为存放结果的数组。

    hashValue的计算方式如下,fsName为flowschemas的名称,fDistinguisher可以是用户名或namespace名称:

    func hashFlowID(fsName, fDistinguisher string) uint64 {
    	hash := sha256.New()
    	var sep = [1]byte{0}
    	hash.Write([]byte(fsName))
    	hash.Write(sep[:])
    	hash.Write([]byte(fDistinguisher))
    	var sum [32]byte
    	hash.Sum(sum[:0])
    	return binary.LittleEndian.Uint64(sum[:8])
    }
    

用法如下:

	var backHand [8]int
	deal, _ := NewDealer(128, 9)
	fmt.Println(deal.DealIntoHand(8238791057607451177, backHand[:]))
//输出:[41 119 0 49 67]

为请求分片队列

上面为流分配了队列,实现了流之间的队列均衡。此时可能为单条流分配了多个队列,下一步就是将单条流的请求均衡到分配到的各个队列中。核心代码如下:

func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int {
	var backHand [8]int
	// Deal into a data structure, so that the order of visit below is not necessarily the order of the deal.
	// This removes bias in the case of flows with overlapping hands.
  //获取本条流的队列列表
	hand := qs.dealer.DealIntoHand(hashValue, backHand[:])
	handSize := len(hand)
  //qs.enqueues表示队列中的请求总数,这里第一次哈希取模算出队列的起始偏移量
	offset := qs.enqueues % handSize
	qs.enqueues++
	bestQueueIdx := -1
	minQueueSeatSeconds := fqrequest.MaxSeatSeconds
  //这里用到了上面的偏移量,并考虑到了队列处理延迟,找到延迟最小的那个队列作为目标队列
	for i := 0; i < handSize; i++ {
		queueIdx := hand[(offset+i)%handSize]
		queue := qs.queues[queueIdx]
		queueSum := queue.requests.QueueSum()

		// this is the total amount of work in seat-seconds for requests
		// waiting in this queue, we will select the queue with the minimum.
		thisQueueSeatSeconds := queueSum.TotalWorkSum
		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR)
		if thisQueueSeatSeconds < minQueueSeatSeconds {
			minQueueSeatSeconds = thisQueueSeatSeconds
			bestQueueIdx = queueIdx
		}
	}
	...
	return bestQueueIdx
}

与使用shuffle sharding增加容错性相似的内容:

使用shuffle sharding增加容错性

使用shuffle sharding增加容错性 最近在看kubernetes的API Priority and Fairness,它使用shuffle sharding来为请求选择处理队列,以此防止高吞吐量流挤占低吞吐量流,进而造成请求延迟的问题。 介绍 首先看下什么是shuffle shardin

使用Cloudflare Worker加速docker镜像

前言 开发者越来越难了,现在国内的docker镜像也都️了,没有镜像要使用docker太难了,代理又很慢 现在就只剩下自建镜像的办法了 GitHub上有开源项目可以快速搭建自己的镜像库,不过还是有点麻烦,还好Cloudflare暂时还活着‍ 本文记录一下使用 Cloudf

使用C#/.NET解析Wiki百科数据实现获取历史上的今天

创建一个webapi项目做测试使用。 创建新控制器,搭建一个基础框架,包括获取当天日期、wiki的请求地址等 创建一个Http请求帮助类以及方法,用于获取指定URL的信息 使用http请求访问指定url,先运行一下,看看返回的内容。内容如图右边所示,实际上是一个Json数据。我们主要解析 大事记 部

Pybind11和CMake构建python扩展模块环境搭建

使用pybind11的CMake模板来创建拓展环境搭建 从Github上下载cmake_example的模板,切换分支,并升级pybind11子模块到最新版本 拉取pybind11使用cmake构建工具的模板仓库 git clone --recursive https://github.com/mr

说说RabbitMQ延迟队列实现原理?

使用 RabbitMQ 和 RocketMQ 的人是幸运的,因为这两个 MQ 自身提供了延迟队列的实现,不像用 Kafka 的同学那么苦逼,还要自己实现延迟队列。当然,这都是题外话,今天咱们重点来聊聊 RabbitMQ 延迟队列的实现原理,以及 RabbitMQ 实现延迟队列的优缺点有哪些? 很多人

使用FModel提取游戏资产

目录前言FModel简介FModel安装FModel使用初次使用资产预览资产导出附录dumperDumper-7生成usmap文件向游戏中注入dll 前言 这篇文章仅记录我作为初学者使用FModel工具提取某款游戏模型的过程。 FModel简介 FModel是一个开源软件,可以用于查看和提取UE4-

使用GSAP制作动画视频

GSAP 3Blue1Brown给我留下了深刻印象。利用动画制作视频,内容简洁,演示清晰。前两天刚好碰到一件事,我就顺便学习了一下怎么用代码做动画。 以javascrip为例,有两个动画引擎,GSAP和Animajs。由于网速的原因,询问了GPT后,我选择了GSAP来制作我的第一个动画视频。 制作动

使用ML.NET训练一个属于自己的图像分类模型,对图像进行分类就这么简单!

前言 今天大姚给大家分享一个.NET开源、免费、跨平台(支持Windows、Linux、macOS多个操作系统)的机器学习框架:ML.NET。并且本文将会带你快速使用ML.NET训练一个属于自己的图像分类模型,对图像进行分类。 ML.NET框架介绍 ML.NET 允许开发人员在其 .NET 应用程序

使用libzip压缩文件和文件夹

简单说说自己遇到的坑: 分清楚三个组件:zlib、minizip和libzip。zlib是底层和最基础的C库,用于使用Deflate算法压缩和解压缩文件流或者单个文件,但是如果要压缩文件夹就很麻烦,主要是不知道如何归档,在zip内部形成对应的目录。这时就需要用更高级别的库,也就是minizip或li

使用gzexe加密shell脚本

使用 gzexe 加密 shell 脚本是一个相对简单的过程。以下是具体的步骤: 编写你的 shell 脚本:首先,你需要有一个 shell 脚本文件,比如 myscript.sh。 确保脚本可执行:使用 chmod 命令确保你的脚本文件是可执行的: chmod +x myscript.sh 使用