在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

biwen,quickapi · 浏览次数 : 0

小编点评

## Biwen.QuickApi 中的事件总线示例 这篇代码展示了如何在 Biwen.QuickApi 中实现一个简单的事件总线。 **事件定义接口:** ```csharp public interface IEvent{} ``` **事件订阅者接口:** ```csharp public interface IEventSubscriber<T> where T : IEvent { Task HandleAsync(T @event, CancellationToken ct); int Order { get; } bool ThrowIfError { get; } } ``` **发布者:** ```csharp public class Publisher(IServiceProvider serviceProvider) { public async Task PublishAsync(T @event, CancellationToken cancellationToken) where T : IEvent { // 注册事件订阅者 var handlers = serviceProvider.GetServices>(); if (handlers is null) return; foreach (var handler in handlers.OrderBy(x => x.Order)) { try { await handler.HandleAsync(@event, ct); } catch (Exception e) { if (handler.ThrowIfError) throw; } } } } ``` **核心代码:** ```csharp public class MyEvent : BaseRequest & IEvent { [FromQuery] public string? Message { get; set; } } public class MyEventHandler : EventSubscriber { private readonly ILogger _logger; public MyEventHandler(ILogger logger) { _logger = logger; } public override async Task HandleAsync(MyEvent @event, CancellationToken ct) { _logger.LogInformation($"msg 2 : {event.Message}"); return Task.CompletedTask; } } ``` **测试代码:** ```csharp [QuickApi("event")] public class EventApi : BaseQuickApi { public override async ValueTask ExecuteAsync(MyEvent request) { // 发布事件 await PublishAsync(request); return IResultResponse.Content("send event"); } } ``` **运行结果:** 通过执行 `curl` 命令,我们可以发送一个事件并查看服务器日志,验证事件已成功发布并处理。 **注意:** * 代码中使用了 `ILogger` 进行日志记录。你可以根据实际需求选择不同的日志框架。 * 代码中使用了 `Order` 和 `ThrowIfError` 属性,您可以根据需要添加或移除这些属性。

正文

闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~

首先定义一个事件约定的空接口

    public interface IEvent{}

然后定义事件订阅者接口

public interface IEventSubscriber<T> where T : IEvent
    {
        Task HandleAsync(T @event, CancellationToken ct);
        /// <summary>
        /// 执行排序
        /// </summary>
        int Order { get; }

        /// <summary>
        /// 如果发生错误是否抛出异常,将阻塞后续Handler
        /// </summary>
        bool ThrowIfError { get; }
    }
    public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
    {
        public abstract Task HandleAsync(T @event, CancellationToken ct);
        public virtual int Order => 0;
        /// <summary>
        /// 默认不抛出异常
        /// </summary>
        public virtual bool ThrowIfError => false;
    }

接着就是发布者


internal class Publisher(IServiceProvider serviceProvider)
{
	public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
	{
		var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
		if (handlers is null) return;
		foreach (var handler in handlers.OrderBy(x => x.Order))
		{
			try
			{
				await handler.HandleAsync(@event, ct);
			}
			catch
			{
				if (handler.ThrowIfError)
				{
					throw;
				}
				//todo:
			}
		}
	}
}

到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了

核心代码如下:

        static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
        static readonly object _lock = new();//锁
        static bool IsToGenericInterface(this Type type, Type baseInterface)
        {
            if (type == null) return false;
            if (baseInterface == null) return false;
            return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
        }
        static IEnumerable<Type> _eventHanlers = null!;
        static IEnumerable<Type> EventHandlers
        {
            get
            {
                lock (_lock)
                    return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
                    !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
            }
        }
		    //注册EventSubscribers
            foreach (var handlerType in EventHandlers)
            {
                var baseType = handlerType.GetInterfaces().First(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber);
                services.AddScoped(baseType, handlerType);
            }
            //注册Publisher
            services.AddScoped<Publisher>();
		

至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:


internal interface IPublisher
{
	/// <summary>
	/// Event Publish
	/// </summary>
	/// <typeparam name="T"></typeparam>
	/// <param name="event">Event</param>
	/// <returns></returns>
	Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
}

然后BaseQuickApi实现IPublisher接口


internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
{
    ValueTask<Rsp> ExecuteAsync(Req request);
}

// BaseQuickApi.PublishAsync
public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
{
    using var scope = ServiceRegistration.ServiceProvider.CreateScope();
    var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();
    await publisher.PublishAsync(@event, cancellationToken);
}

至此功能完成,接下来我们测试一下:


using Biwen.QuickApi.Events;
using Microsoft.AspNetCore.Mvc;

namespace Biwen.QuickApi.DemoWeb.Apis
{
    public class MyEvent : BaseRequest<MyEvent>,IEvent
    {
        [FromQuery]
        public string? Message { get; set; }
    }

    public class MyEventHandler : EventSubscriber<MyEvent>
    {
        private readonly ILogger<MyEventHandler> _logger;
        public MyEventHandler(ILogger<MyEventHandler> logger)
        {
            _logger = logger;
        }

        public override Task HandleAsync(MyEvent @event, CancellationToken ct)
        {
            _logger.LogInformation($"msg 2 : {@event.Message}");
            return Task.CompletedTask;
        }
    }

    /// <summary>
    /// 更早执行的Handler
    /// </summary>
    public class MyEventHandler2 : EventSubscriber<MyEvent>
    {
        private readonly ILogger<MyEventHandler2> _logger;
        public MyEventHandler2(ILogger<MyEventHandler2> logger)
        {
            _logger = logger;
        }

        public override Task HandleAsync(MyEvent @event, CancellationToken ct)
        {
            _logger.LogInformation($"msg 1 : {@event.Message}");
            return Task.CompletedTask;
        }

        public override int Order => -1;

    }

    /// <summary>
    /// 抛出异常的Handler
    /// </summary>
    public class MyEventHandler3 : EventSubscriber<MyEvent>
    {
        private readonly ILogger<MyEventHandler3> _logger;
        public MyEventHandler3(ILogger<MyEventHandler3> logger)
        {
            _logger = logger;
        }

        public override Task HandleAsync(MyEvent @event, CancellationToken ct)
        {
            throw new Exception("error");
        }

        public override int Order => -2;

        public override bool ThrowIfError => false;

    }

    [QuickApi("event")]
    public class EventApi : BaseQuickApi<MyEvent>
    {
        public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)
        {
            //publish
            await PublishAsync(request);
            return IResultResponse.Content("send event");
        }
    }
}

最后我们运行项目测试一下功能:

curl -X 'GET' \
  'http://localhost:5101/quick/event?Message=hello%20world' \
  -H 'accept: */*'

image

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi

与在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)相似的内容:

在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~ 首先定义一个事件约定的空接口 public interface IEvent{} 然后定义事件订阅者接口 public interface I

Biwen.Settings添加对IConfiguration&IOptions的集成支持

Biwen.Settings 是一个简易的配置项管理模块,主要的作用就是可以校验并持久化配置项,比如将自己的配置存储到数据库中,JSON文件中等 使用上也是很简单,只需要在服务中注入配置, 比如我们有一个GithubSetting的配置项,我们只需要定义好对象然后注入到Service中即可: [De

深入理解 Vue 3 组件通信

在 Vue 3 中,组件通信是一个关键的概念,它允许我们在组件之间传递数据和事件。本文将介绍几种常见的 Vue 3 组件通信方法,包括 props、emits、provide 和 inject、事件总线以及 Vuex 状态管理。 1. 使用 props 和 emits 进行父子组件通信 props

WPF使用AppBar实现窗口停靠,适配缩放、全屏响应和多窗口并列(附封装好即开即用的附加属性)

在吕毅大佬的文章中已经详细介绍了什么是AppBar: WPF 使用 AppBar 将窗口停靠在桌面上,让其他程序不占用此窗口的空间(附我封装的附加属性) - walterlv 即让窗口固定在屏幕某一边,并且保证其他窗口最大化后不会覆盖AppBar占据的区域(类似于Windows任务栏)。 但是在我的

强烈推荐:18.3k star,推荐一款简单易用的HTTP请求流量录制回放工具:Goreplay

在软件开发和测试过程中,我们经常需要对应用程序的网络请求进行录制和回放,以便进行性能分析、压力测试或者模拟复杂的网络环境。今天,我要向大家推荐一款简单易用的 HTTP 请求流量录制回放工具:Goreplay。 1、简介 Goreplay 是一款用 Go 语言编写的 HTTP 请求流量录制回放工具。它

吐血整理如何在Google Earth Engine上写循环 五个代码实例详细拆解

在这里同步一篇本人的原创文章。原文发布于2023年发布在知乎专栏,转移过来时略有修改。全文共计3万余字,希望帮助到GEE小白快速进阶。 引言 这篇文章主要解答GEE中.map()和.iterate()函数的用法。 首先解答一个疑问,为什么需要自己写循环?确实,GEE 为各种数据类型提供了无数常用的内

企业级环境部署:在 Linux 服务器上如何搭建和部署 Python 环境?

在大部分企业里,自动化测试框架落地都肯定会集成到Jenkins服务器上做持续集成测试,自动构建以及发送结果到邮箱,实现真正的无人值守测试。 不过Jenkins搭建一般都会部署在公司的服务器上,不会在私人电脑里,而服务器大部分都是Linux操作系统的。所以,我们如果要在Linux上的Jenkins服务

manim边学边做--Matrix

在代数问题中,矩阵是必不可少的工具,manim中提供了一套展示矩阵(Matrix)的模块,专门用于在动画中显示矩阵格式的数据。关于矩阵的类主要有4个: Matrix:通用的矩阵 IntegerMatrix:元素是整数的矩阵 DecimalMatrix:元素包含小数的矩阵 MobjectMatrix:

OceanBase 金融项目优化案例(union all 改写)

在工单系统上看到有一条sql问题还没解决,工单描述看到压测场景被cpu资源被这条sql打爆,目前影响到项目进度,比较紧急。 直接联系这位同学看看是否需要帮忙。 慢SQL: SELECT task.*, sc01.aab300 bjsjjg, (SELECT sc05.bsc012 FROM sc05

iOS开发基础109-网络安全

在iOS开发中,保障应用的网络安全是一个非常重要的环节。以下是一些常见的网络安全措施及对应的示例代码: Swift版 1. 使用HTTPS 确保所有的网络请求使用HTTPS协议,以加密数据传输,防止中间人攻击。 示例代码: 在Info.plist中配置App Transport Security (