今天我们很高兴宣布 CAP 发布 8.2 版本正式版,我们在这个版本中主要致力于对订阅着并行执行的特性提供支持,同时添加了对在订阅者中对消息头的控制行为。
下面,具体看一下我们新版本的功能吧。
可能有些人还不知道 CAP 是什么,老规矩来一个简介。
CAP 是一个用来解决微服务或者分布式系统中分布式事务问题的一个开源项目解决方案(https://github.com/dotnetcore/CAP)同样可以用来作为 EventBus 使用,该项目诞生于2016年,目前在 Github 已经有超过 6500+ Star 和 110+ 贡献者,以及在 NuGet超 800 万的下载量,并在越来越多公司的和项目中得到应用。
如果你想对 CAP 更多了解,请查看我们的 官方文档。
本次在 CAP 8.2 版本中我们主要带来了以下新特性:
在介绍这个新特性之前, 我们先来看看过去如果想让消费者独立执行应该怎么做。
熟悉 CAP 的同学可能了解到,CAP提供了一个 UseDispatchingPerGroup
配置项用来为消费者组启用独立的调度器,独立的调度器意味着其具有独立的消费管道,这样消费者组之间就不会互相影响(默认如果不开启此配置项会将不同组的消息放置到同一管道来调度并串行执行),从而可以实现如某些消费者执行非常长的时间也不会卡住另外一些执行时间比较短消费者,这样也就做到了独立执行。
当然,如果想开启多个线程并发执行,可以通过设置 SubscriberParallelExecuteThreadCount
配置项来启用多个执行线程执行,这会提高消息的消费速度。
可以看到,在过去是不支持为消费者设置独立的并行度的,SubscriberParallelExecuteThreadCount
其提供的是全局配置,不支持独立为订阅者进行设置。
以上都是过去的行为,下面我们看一下现在如何做。
在新版本中,我们为 [CapSubscribe]
这个 Attribute 添加了一个新的参数 GroupConcurrent
来为订阅着设置并行度,也就是说每一个组/订阅者都可以独立设置,设置之后他们之间互不影响,独立执行。
下面我们来看一个示例,定义消费者如下
[CapSubscribe("hello")]
public void Hello1(){
Console.WriteLine("hello 1");
}
[CapSubscribe("hello", Group = "foo", GroupConcurrent = 2)]
public void Hello2(){
Console.WriteLine("hello 2");
Thread.Sleep(1000);
}
[CapSubscribe("hello", GroupConcurrent = 2)]
public void Hello3(){
Console.WriteLine("hello 3");
Thread.Sleep(1000);
}
为了验证 Hello2
和 Hello3
的执行,我添加了 Thread.Sleep(1000);
这2行代码。
现在,发送一条名为hello
的消息,我们来看一下订阅者是如何执行的。
await cap.PublishAsync<string>("hello", null);
熟悉CAP的同学可能知道,在过去 Hello1
和 Hello2
是都会执行,如果启用了 UseDispatchingPerGroup
配置项,那么会同时执行,否则会串行执行。
在新版本,我们来看看是怎么执行的。
先看 Hello2
,新添加了 GroupConcurrent
参数,会是什么效果呢? 正如和前面说过的,这是一个消费并行度参数,在 Hello2
中设置为 2 那么就代表这个方法在有新消息到达时候,最多会有2个线程在同时执行它。所以此处的打印结果会是同时打印2个 hello 2
。
再来看 Hello1
和 Hello3
,这两个哪个会打印呢? 其实这两个也都会打印出来,那么他们两个有什么区别呢?
可以看到的是一个添加了 GroupConcurrent
,另外一个没添加。所以我来解释一下,在内部,我们会检测如果一个订阅者添加了 GroupConcurrent 而没有指定 Group 的话,那么我们会认为其需要独立执行,此时我们会自动采用订阅名称为其生成默认 Group 名称,在此示例中就相当于设置了 Group="hello"。
这就说完了,整体来说是不是很简单,需要就设置,不需要就不设置,没有复杂的配置。
所以新版本我们移除了 UseDispatchingPerGroup
配置项,如你所见这已经是默认行为。
PS: 如果将多个订阅者设置为同一组,并且还为这些订阅者设置了 GroupConcurrent 值,则只有第一个订阅者设置的值才会生效。
PS: GroupConcurrent 在 RabbitMQ 中会自动对应设置Qos,无需再手动干预。
在发布消息的时候,我们支持指定 callbackName 参数来进行补偿事务,消费者执行完成后 CAP 会自动调用 callbackName 来回调执行补偿方法。
接收到用户的反馈,在某些场景下,消费者可能执行成功就不需要再回调发布方,那么此时消费者就需要控制补偿行为来避免回调,再或者想回调另外一个方法来执行其他的补偿流程。
在过去可能需要通过消费过滤器来实现,现在我们为 CapHeader 提供了更多的行为来实现这一目的。下面来看一下,要说的话都在代码里。
[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JsonElement param, [FromCap] CapHeader header)
{
var orderId = param.GetProperty("OrderId").GetInt32();
var productId = param.GetProperty("ProductId").GetInt32();
var qty = param.GetProperty("Qty").GetInt32();
// 添加额外的头信息到响应消息中
header.AddResponseHeader("some-message-info", "this is the test");
// 或再次添加补偿的回调
header.AddResponseHeader(DotNetCore.CAP.Messages.Headers.CallbackName, "place.order.qty.deducted-callback");
// 如果你不再遵从发送着指定的回调,想修改回调,可通过 RewriteCallback 方法修改。
header.RewriteCallback("new-callback-name");
// 如果你想终止/停止,或不再给发送方响应,调用 RemoveCallback 来移除回调。
header.RemoveCallback();
return new { OrderId = orderId, IsSuccess = true };
}
在发布消息时,我们支持使用 EnablePublishParallelSend
来启用并行发送,因为发布消息属于 short-term 的task,所以非常放置到 .NET 线程池执行。
这本身没有问题,在过去我们进行过压力测试也没有发现问题,但是零星用户反馈会导致应用崩溃,询问也没有反馈更多信息(PS: 大家提交完 issue 不是就代表不用管了,还请关注后续),暂时还得不到更多消息是否是 Docker 限制了内存或CPU导致的,不得而知。
无论如何,我们在这个版本优化了这一行为,现在会分批次来将任务提交到线程池,在上一批次执行完成后才会放置下一批次的任务。如果发布速度很快,内存队列达到阈值也就是堆满后,会启用背压机制来延缓发布者的响应。
现在 NATS 也有了 CustomHeadersBuilder
配置项,用于和三方异构系统做对接。没有太多好说的,和其他 Transport 的用法都是一致。
升级 Npgsql 到最新版本
安全性升级,Npgsql 易受通过协议信息大小溢出进行 SQL 注入。
https://github.com/npgsql/npgsql/security/advisories/GHSA-x9vc-6hfv-hg8c
升级 Dashboard npm包到最新次要版本
修复 NATS重启后健康检查偶尔未正确恢复的问题
参考 Issue #1542
以上,就是本版本我们做出的一些新特性和改动,感谢大家的支持,我们很开心能够帮助到大家 。
大家在使用的过程中遇到问题希望也能够积极的反馈,帮助CAP变得越来越好。😃
如果你喜欢这个项目,可以通过下面的连接点击 Star 给我们支持。
如果你觉得本篇文章对您有帮助的话,感谢您的【推荐】。
本文地址:http://www.cnblogs.com/savorboard/p/cap-8-2.html
作者博客:Savorboard
本文原创授权为:署名 - 非商业性使用 - 禁止演绎,协议普通文本 | 协议法律文本