原文地址:Consumer Sagas
consumer saga是一个由CorrelationId标识的类,它定义了由saga repository持久化的状态。除了状态之外,还可以向saga类添加接口,定义由saga处理的事件。这种状态和行为在单个类中的组合就是一个consumer saga。在下面的示例中,定义了由SubmitOrder消息发起的order saga。
public record SubmitOrder :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime OrderDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
SubmitDate = context.Message.OrderDate;
}
}
当Saga的接收端点接收到SubmitOrder消息时,将使用CorrelationId属性来确定是否存在具有该CorrelationId的现有Saga实例。如果没有找到现有的实例,repository将创建一个新的saga实例,并在新实例上调用Consume方法。在Consume方法完成之后,repository保存新创建的实例。
要定义由现有的saga实例(如OrderAccepted)编排的事件,需要指定一个额外的接口和方法。
public record OrderAccepted :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context)
{
AcceptDate = context.Message.Timestamp;
}
}
要定义一个可以启动一个新的或编排一个现有的saga实例(如OrderInvoiced)的事件,需要指定一个额外的接口和方法。
public record OrderInvoiced :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
public decimal Amount { get; init; }
}
public class OrderPaymentSaga :
ISaga,
InitiatedByOrOrchestrates<OrderInvoiced>
{
public Guid CorrelationId { get; set; }
public DateTime? InvoiceDate { get; set; }
public decimal? Amount { get; set; }
public async Task Consume(ConsumeContext<OrderInvoiced> context)
{
InvoiceDate = context.Message.Timestamp;
Amount = context.Message.Amount;
}
}
要定义由未实现CorrelatedBy接口(如OrderShipped)的现有saga实例观察到的事件,需要指定一个额外的接口和方法。
public record OrderShipped
{
public Guid OrderId { get; init; }
public DateTime ShipDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
Observes<OrderShipped, OrderSaga>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public DateTime? ShipDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context) {...}
public async Task Consume(ConsumeContext<OrderShipped> context)
{
ShipDate = context.Message.ShipDate;
}
public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
(saga,message) => saga.CorrelationId == message.OrderId;
}
要在配置MassTransit时添加一个saga,请使用如下所示的AddSaga方法
services.AddMassTransit(x =>
{
x.AddSaga<OrderSaga>()
.InMemoryRepository();
});