从ObjectPool到CAS指令

ObjectPool,CAS · 浏览次数 : 502

小编点评

**Interlocked.compareExchange 方法** `Interlocked.compareExchange()` 方法是一个CAS的实现,它允许在多个线程中原子性地对一个数据进行操作。 **方法参数** * `location1`:要比较的引用地址。 * `value`:要设置的值。 * `comparand`:要比较的引用地址。 **方法返回值** * 如果操作成功,返回 `true`。 * 如果操作失败,返回 `false`。 **方法工作原理** 1. 首先,检查 `location1` 是否已缓存。 2. 如果已缓存,检查其状态是否为 `modified`。 3. 如果 `modified`,则从缓存中获取 `value`。 4. 如果 `modified`不是 `cache`,则从 `comparand` 中获取 `location1` 的新引用地址。 5. 然后,检查 `location1` 是否已被其他线程缓存。 6. 如果已缓存,则检查其状态是否为 `shared`。 7. 如果 `shared`,则将 `value` 设置到 `location1`。 8. 如果 `shared`不是 `cache`,则从缓存中获取 `value`。 9. 如果 `shared`不是 `location1`,则将其状态从 `modified`变为 `invalid`。 10. 如果操作成功,返回 `true`。 11. 如果操作失败,返回 `false`。 **优点** * atomicity:保证操作的原子性。 * 无锁:无需使用锁,避免线程竞争。 * 无缓存:避免缓存问题。 **缺点** * 性能:由于使用了 CAS,可能比非 CAS 版本略慢。 * cache失效:如果缓存失效,操作可能会失败。

正文

相信最近看过我的文章的朋友对于Microsoft.Extensions.ObjectPool不陌生;复用、池化是在很多高性能场景的优化技巧,它能减少内存占用率、降低GC频率、提升系统TPS和降低请求时延。

那么池化和复用对象意味着同一时间会有多个线程访问池,去获取和归还对象,那么这肯定就有并发问题。那ObjectPool在涉及多线程访问资源应该怎么做到线程安全呢?

今天就带大家通过学习ObjectPool的源码聊一聊它是如何实现线程安全的。

源码解析

ObjectPool的关键就在于两个方法,一个是Get用于获取池中的对象,另外就是Return用于归还已经使用完的对象。我们先来简单的看看ObjectPool的默认实现DefaultObjectPool.cs类的内容。

私有字段

先从它的私有变量开始,下面代码中给出,并且注释了其作用:

// 用于存放池化对象的包装数组 长度为构造函数传入的max - 1
// 为什么 -1 是因为性能考虑把第一个元素放到 _firstItem中
private protected readonly ObjectWrapper[] _items;

// 池化策略 创建对象 和 回收对象的防范
private protected readonly IPooledObjectPolicy<T> _policy;

// 是否默认的策略 是一个IL优化 使编译器生成call 而不是 callvirt
private protected readonly bool _isDefaultPolicy;

// 因为池化大多数场景只会获取一个对象 为了性能考虑 单独整一个对象不放在数组中 
// 避免数组遍历
private protected T? _firstItem;

// 这个类是在2.1中引入的,以尽可能地避免接口调用 也就是去虚拟化 callvirt
private protected readonly PooledObjectPolicy<T>? _fastPolicy;

构造方法

另外就是它的构造方法,默认实现DefaultObjectPool有两个构造函数,代码如下所示:

/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy)
    : this(policy, Environment.ProcessorCount * 2)
{
    // 从这个构造方法可以看出,如果我们不指定ObjectPool的池大小
    // 那么池大小会是当前可用的CPU核心数*2
}

/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
/// <param name="maximumRetained">The maximum number of objects to retain in the pool.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy, int maximumRetained)
{
    _policy = policy ?? throw new ArgumentNullException(nameof(policy));

    // 是否为可以消除callvirt的策略
    _fastPolicy = policy as PooleObjectPolicy<T>;
    // 如上面备注所说 是否为默认策略 可以消除callvirt
    _isDefaultPolicy = IsDefaultPolicy();

    // 初始化_items数组 容量还剩一个在 _firstItem中
    _items = new ObjectWrapper[maximumRetained - 1];

    bool IsDefaultPolicy()
    {
        var type = policy.GetType();

        return type.IsGenericType && type.GetGenericTypeDefinition() == typeof(DefaultPooledObjectPolicy<>);
    }
}

Get 方法

如上文所说,Get()方法是ObjectPool中最重要的两个方法之一,它的作用就是从池中获取一个对象,它使用了CAS近似无锁的指令来解决多线程资源争用的问题,代码如下所示:

public override T Get()
{
    // 先看_firstItem是否有值
    // 这里使用了 Interlocked.CompareExchange这个方法
    // 原子性的判断 _firstItem是否等于item
    // 如果等于那把null赋值给_firstItem
    // 然后返回_firstItem对象原始的值  反之就是什么也不做
    var item = _firstItem;
    if (item == null || Interlocked.CompareExchange(ref _firstItem, null, item) != item)
    {

        var items = _items;
        // 遍历整个数组
        for (var i = 0; i < items.Length; i++)
        {
            item = items[i].Element;
            // 通过原子性的Interlocked.CompareExchange尝试读取一个元素
            // 读取成功则返回
            if (item != null && Interlocked.CompareExchange(ref items[i].Element, null, item) == item)
            {
                return item;
            }
        }

        // 如果遍历整个没有获取到元素
        // 那么走创建方法,创建一个
        item = Create();
    }

    return item;
}

上面代码中,有一个点解释一下Interlocked.CompareExchange(ref _firstItem, null, item) != item,其中!=item,如果其等于item就说明交换成功了,当前线程获取到_firstItem元素的期间没有其它线程修改_firstItem的值。

Return 方法

Retrun(T obj)方法是ObjectPool另外一个重要的方法,它的作用就是当程序代码把从池中获取的对象使用完以后,将其归还到池中。同样,它也使用CAS指令来解决多线程资源争用的问题,代码如下所示:

public override void Return(T obj)
{
    // 使用策略的Return方法对元素进行处理
    // 比如 List<T> 需要调用Claer方法清除集合内元素
    // StringBuilder之类的也需要调用Claer方法清除缓存的字符
    if (_isDefaultPolicy || (_fastPolicy?.Return(obj) ?? _policy.Return(obj)))
    {
        // 先尝试将归还的元素赋值到 _firstItem中
        if (_firstItem != null || Interlocked.CompareExchange(ref _firstItem, obj, null) != null)
        {
            var items = _items;
            // 如果 _firstItem已经存在元素
            // 那么遍历整个数组空间 找一个存储为null的空位将对象存储起来
            for (var i = 0; i < items.Length && Interlocked.CompareExchange(ref items[i].Element, obj, null) != null; ++i)
            {
            }
        }
    }
}

从核心的Get()Set()方法来看,其实整个代码是比较简单的,除了有一个_firstItem有一个简单的优化,其余没有什么特别的复杂的逻辑。

主要的关键就在Interlocked.CompareExchange方法上,我们在下文来仔细研究一下这个方法。

关于 Interlocked.CompareExchange

Interlocked.CompareExchange它实际上是一个CAS的实现,也就是Compare And Swap,从名字就可以看出来,它就是比较然后交换的意思。

从下面的代码段我们也可以看出来,它总共需要三个参数。其特性就是只有当localtion1 == comparand的时候才会将value赋值给localtion1,另外吧localtion1的原始值返回出来,这些操作都是原子性的。

// localtion1 需要比较的引用A
// value 计划给引用A 赋的值
// comparand 和引用A比较的引用
public static T CompareExchange<T> (ref T location1, T value, T comparand) 
where T : class;

一个简单的流程如下所示:

简单的使用代码如下所示:

var a = 1;
// a == 1的话就将其置为0
// 判断是否成功就看返回的值是否为a的原始值
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
	Console.WriteLine("1.成功");
	
// 现在a已经变为0 这个交换不会成功
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
	Console.WriteLine("2.成功");

结果如下所示,只有当a的原始值为1的时候,才会交换成功:

那么Interlocked.CompareExchange是如何做到原子性的?在多核CPU中,数据可能在内存或者L1、L2、L3中(如下图所示),我们如何保证能原子性的对某个数据进行操作?

实际上这是CPU提供的功能,如果查看过JIT编译的结果,可以看到CompareExchange是由一条叫lock cmpxchgl的汇编指令支撑的。

其中lock是一个指令前缀,汇编指令被lock修饰后会成为"原子的",lock指令有两种实现方法:

  • 早期 - Pentium时代(锁总线),在Pentium及之前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其它处理器暂时无法通过总线访问内存,很显然,这个开销很大。
  • 现在 - P6以后时代(锁缓存),在新的处理器中,Intel使用缓存锁定来保证指令执行的原子性,缓存锁定将大大降低lock前缀指令的执行开销。

现在这里的锁缓存(Cache Locking)就是用了Ringbus + MESI协议。

MESI协议是 Cacheline 四种状态的首字母的缩写,分别是修改(Modified)态、独占(Exclusive)态、共享(Shared)态和失效(Invalid)态。 Cache 中缓存的每个 Cache Line 都必须是这四种状态中的一种。

修改态(Modified),如果该 Cache Line 在多个 Cache 中都有备份,那么只有一个备份能处于这种状态,并且“dirty”标志位被置上。拥有修改态 Cache Line 的 Cache 需要在某个合适的时候把该 Cache Line 写回到内存中。但是在写回之前,任何处理器对该 Cache Line在内存中相对应的内存块都不能进行读操作。 Cache Line 被写回到内存中之后,其状态就由修改态变为共享态。

独占态(Exclusive),和修改状态一样,如果该 Cache Line 在多个 Cache 中都有备份,那么只有一个备份能处于这种状态,但是“dirty”标志位没有置上,因为它是和主内存内容保持一致的一份拷贝。如果产生一个读请求,它就可以在任何时候变成共享态。相应地,如果产生了一个写请求,它就可以在任何时候变成修改态。

共享态(Shared),意味着该 Cache Line 可能在多个 Cache 中都有备份,并且是相同的状态,它是和内存内容保持一致的一份拷贝,而且可以在任何时候都变成其他三种状态。

失效态(Invalid),该 Cache Line 要么已经不在 Cache 中,要么它的内容已经过时。一旦某个Cache Line 被标记为失效,那它就被当作从来没被加载到 Cache 中。

总得来说,若干个CPU核心通过Ringbus连到一起。每个核心都维护自己的Cache的状态。如果对于同一份内存数据在多个核里都有Cache,则状态都为S(Shared)。

一旦有一核心改了这个数据(状态变成了M),其他核心就能瞬间通过Ringbus感知到这个修改,从而把自己的Cache状态变成I(Invalid),并且从标记为M的Cache中读过来。同时,这个数据会被原子的写回到主存。最终,Cache的状态又会变为S。

关于MESI协议更详细的信息就不在本文中介绍了,在计算机操作系统和体系结构相关书籍和资料中有更详细的介绍。

然后compxchg这个指令就很简单了,和我们之前提到的一样,比较两个地址中的值是否相等,如果相等的话那么就修改。

Interlocked类中的其它方法也是同样的原理,我们可以看看Add之类的方法,同样是在对应的操作指令前加了lock指令。

总结

本文主要是带大家看了下ObjectPool的源码,然后看了看ObjectPool能实现无锁线程安全的最大功臣Interlocked.CompareExchange方法;然后通过汇编代码了解了一下Interlocked类中的一些方法是如何做到原子性的。

感谢阅读,如果您觉得本文还不错,欢迎点赞、转发+评论,您的支持是我更新的动力!

与从ObjectPool到CAS指令相似的内容:

从ObjectPool到CAS指令

相信最近看过我的文章的朋友对于Microsoft.Extensions.ObjectPool不陌生;复用、池化是在很多高性能场景的优化技巧,它能减少内存占用率、降低GC频率、提升系统TPS和降低请求时延。 那么池化和复用对象意味着同一时间会有多个线程访问池,去获取和归还对象,那么这肯定就有并发问题。

从安装到配置,教你用Argo CD对接CCE集群完成测试、生产部署

本文使用两个CCE集群模拟测试及生产环境,使用gitlab仓库作为应用部署yaml文件存储仓库,通过Argo CD对接不同CCE集群完成测试、生产环境业务部署。

从PDF到OFD,国产化浪潮下多种文档格式导出的完美解决方案

前言 近年来,中国在信息技术领域持续追求自主创新和供应链安全,伴随信创上升为国家战略,一些行业也开始明确要求文件导出的格式必须为 OFD 格式。OFD 格式目前在政府、金融、税务、教育、医疗等需要文件开放、共享和长期保存的行业中广泛应用。这种趋势在未来几年内将进一步增强。 相较于 PDF,OFD 在

.net入行三年的感想回顾

从21年毕业到现在,还差几天就三年了 工作后才知道,工作年限分为1年以下 、3~5年、5~10年、晋升老板,每段都有每段的故事和总结 回顾下我的前三年工作心路,思考下未来发展之路(emmm,我是觉得我是干不了一辈子程序员的 我的工作地点不在大城市,因为我爸不让我出去,家里也不是很缺钱,所以薪资不会辣

嵌入式行业入行6年的一点小感想

从18年毕业到现在已经工作6年了。 熟悉招聘的人都知道,对于工作年限来说,工作开始的前3年是一个分水岭,3~5年是一个分水岭,5~10年又是一个分水岭。10年以上又是一个分水岭...... 我曾经以工作第3年为一个节点,做过一些小小的总结;现在又是3年了,我想借此机会简单概括一下这些年(21年到~2

从JDK8升级到JDK17

一、概述 鉴于JDK8已经是老古董,还有性能问题,兼且各个公司已经不再维护1.8的JDK,所以升级公司的核心产品之一的后端到JDK到17是相对要紧的事情。 通过升级到jdk17,具有以下好处: 不要在头疼同时适应两个jdk,放下适应JDK8的负担 在生产环境基本上只需要部署一个jdk即可 具有更好的

我的日常AI使用

从去年年初开始,AI技术真正走入了我们的日常生活。从OpenAI到如今字节跳动的coze,我们通过AI大模型可以做很多事情,工具和平台众多,如何选择和使用有必要总结一下。 编程和debug方面 尽管gpt-4和gpt-4o确实很强,但对于持续代码改进和代码调试方面,依然不够好,并且它对于非Plus会

从 Docker Hub 拉取镜像受阻?这些解决方案帮你轻松应对

最近一段时间 Docker 镜像一直是 Pull 不下来的状态,感觉除了挂,想直连 Docker Hub 是几乎不可能的。更糟糕的是,很多原本可靠的国内镜像站,例如一些大厂和高校运营的,也陆续关停了,这对我们这些个人开发者和中小企业来说是挺难受的。之前,通过这些镜像站,我们可以快速、方便地获取所

从“专家”视角看:2024年软件测试行业的八大发展趋势!

随着技术的快速发展和数字化转型的深入推进,软件测试行业正面临着前所未有的变革。2024年,我们可以预见软件测试行业将呈现出几个重要的趋势将深刻影响软件测试的方式、工具和流程。它们将重塑软件测试的格局,提升软件质量,推动整个行业的进步,以下是具体的预判解读,供参考。 1. AI与机器学习的深度整合 在

从Purge机制说起,详解GaussDB(for MySQL)的优化策略

当前GaussDB(for MySQL)的Purge优化功能,通过任务流水线化、线程优先级调整、二次分发等手段,避免数据库undo log堆积,极大提升Purge的性能,大幅改善用户体验。