聊聊MassTransit——Consumer Saga(译)

聊聊,masstransit,consumer,saga · 浏览次数 : 20

小编点评

**Consumer Saga 的定义** ```csharp public record ConsumerSaga : ISaga, InitiatedBy { // ... } ``` * **CorrelationId** 属性用于在不同的 Saga 中关联同一实例。 * **SubmitOrder** 接口用于定义由 SubmitOrder 消息发起的 Saga。 * **OrderSaga** 类实现 **InitiatedBy** 接口并执行 **Consume** 方法。 * **Consume** 方法使用 **CorrelationId** 属性来定位已存在的 Saga 实例,并更新状态。 * **Orchestrates** 接口用于定义需要在 Saga 接收消息时执行的行为。 **其他接口和方法** * **OrderAccepted** 和 **OrderInvoiced** 接口用于定义 Saga 处理订单状态的变更。 * **OrderShipped** 接口用于定义 Saga 处理订单发货状态的变更。 * **CorrelationExpression** 用于定义由未实现 CorrelatedBy 接口的 Saga 的观察条件。

正文

原文地址:Consumer Sagas

consumer saga是一个由CorrelationId标识的类,它定义了由saga repository持久化的状态。除了状态之外,还可以向saga类添加接口,定义由saga处理的事件。这种状态和行为在单个类中的组合就是一个consumer saga。在下面的示例中,定义了由SubmitOrder消息发起的order saga。

Interfaces

InitiatedBy

public record SubmitOrder :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime OrderDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        SubmitDate = context.Message.OrderDate;
    }
}

当Saga的接收端点接收到SubmitOrder消息时,将使用CorrelationId属性来确定是否存在具有该CorrelationId的现有Saga实例。如果没有找到现有的实例,repository将创建一个新的saga实例,并在新实例上调用Consume方法。在Consume方法完成之后,repository保存新创建的实例。

Orchestrates

要定义由现有的saga实例(如OrderAccepted)编排的事件,需要指定一个额外的接口和方法。

public record OrderAccepted :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}

    public async Task Consume(ConsumeContext<OrderAccepted> context)
    {
        AcceptDate = context.Message.Timestamp;
    }
}

InitiatedByOrOrchestrates

要定义一个可以启动一个新的或编排一个现有的saga实例(如OrderInvoiced)的事件,需要指定一个额外的接口和方法。

public record OrderInvoiced :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
    public decimal Amount { get; init; }
}

public class OrderPaymentSaga :
    ISaga,
    InitiatedByOrOrchestrates<OrderInvoiced>
{
    public Guid CorrelationId { get; set; }

    public DateTime? InvoiceDate { get; set; }
    public decimal? Amount { get; set; }

    public async Task Consume(ConsumeContext<OrderInvoiced> context)
    {
        InvoiceDate = context.Message.Timestamp;
        Amount = context.Message.Amount;
    }
}

Observes

要定义由未实现CorrelatedBy接口(如OrderShipped)的现有saga实例观察到的事件,需要指定一个额外的接口和方法。

public record OrderShipped
{
    public Guid OrderId { get; init; }
    public DateTime ShipDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
    Observes<OrderShipped, OrderSaga>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }
    public DateTime? ShipDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
    public async Task Consume(ConsumeContext<OrderAccepted> context) {...}

    public async Task Consume(ConsumeContext<OrderShipped> context)
    {
        ShipDate = context.Message.ShipDate;
    }

    public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
        (saga,message) => saga.CorrelationId == message.OrderId;
}

Configuration

要在配置MassTransit时添加一个saga,请使用如下所示的AddSaga方法

services.AddMassTransit(x =>
{
    x.AddSaga<OrderSaga>()
        .InMemoryRepository();
});

与聊聊MassTransit——Consumer Saga(译)相似的内容:

聊聊MassTransit——Consumer Saga(译)

原文地址:[Consumer Sagas](https://masstransit.io/documentation/patterns/saga/consumer-sagas "Consumer Sagas") consumer saga是一个由CorrelationId标识的类,它定义了由saga

聊聊MassTransit——状态机实现Saga模式(译)

翻译自 [Saga State Machines](https://masstransit.io/documentation/patterns/saga "Saga State Machines") ### Saga State Machines(状态机) > Saga State Machines

聊聊MassTransit——实现Saga模式概览(译)

原文地址:[Saga Overview](https://masstransit.io/documentation/patterns/saga "Saga Overview") ### 系列地址 - [聊聊MassTransit——状态机实现Saga模式(译)](https://www.cnblog

聊聊GLM-4-9B开源模型的微调loss计算

概述 Github官方地址:GLM-4 网上已经有很多关于微调的文章,介绍各种方式下的使用,这里不会赘述。我个人比较关心的是微调时的loss计算逻辑,这点在很多的文章都不会有相关的描述,因为大多数人都是关心如何使用之类的应用层,而不是其具体的底层逻辑,当然咱也说不清太底层的计算。 可了解其它loss

聊聊一个差点被放弃的项目以及近期的开源计划

前言 自从 StarBlog 和 SiteDirectory 之后,我还没写新的关于开源项目的系列,最近又积累了很多想法,正好写一篇博客来总结一下。 关于差点被放弃的项目,就是最近一直在做的单点认证(IdentityServerLite) IdentityServerLite 开发这个项目的起因,是

聊聊 JSON Web Token (JWT) 和 jwcrypto 的使用

哈喽大家好,我是咸鱼。 最近写的一个 Python 项目用到了 jwcrypto 这个库,这个库是专门用来处理 JWT 的,JWT 全称是 JSON Web Token ,JSON 格式的 Token。 今天就来简单入门一下 JWT。 官方介绍:https://jwt.io/introduction

聊聊MySQL是如何处理排序的

在MySQL的查询中常常会用到 order by 和 group by 这两个关键字,它们的相同点是都会对字段进行排序,那查询语句中的排序是如何实现的呢?

聊聊 Linux iowait

哈喽大家好,我是咸鱼。 我们在使用 top 命令来查看 Linux 系统整体 CPU 使用情况的时候,往往看的是下面这一列: %Cpu(s): 0.0 us, 0.0 sy, 0.0 ni,100.0 id, 68.0 wa, 0.0 hi, 0.0 si, 0.0 st 其中,man 手册解释 w

聊聊Mybatis框架原理

好久没有写博客了。最近工作中封装了一个类似ORM框架的东西。大概的原理就是将Excel数据初始化到本地sqlite数据库后,通过json配置文件,对数据库的数据做增删改查等操作。 其实大概的思考了下,就是半ORM框架mybatis的逻辑,只是我们自己封装的简陋蛮多。想想有现成的轮子没用,反而是自己写

聊聊Spring的工厂方法与FactoryBean

概述 工厂方法是比较常见,常用的一种设计模式。FactoryBean是Spring提供的一种Bean注入IOC容器的方式。 工厂方法 在做日常开发时,一般都会避免直接new对象,而且将new的操作丢给IOC容器,但对于第三方系统的集成,我们不太好直接丢给IOC容器,此时可以通过工厂模式, 提供一个工