设计模式之订阅发布模式

设计模式,订阅,发布,模式 · 浏览次数 : 280

小编点评

**简介** 发布者(发布者或主题)并不直接发送消息给订阅者,而是通过调度中心(或叫消息代理)来传递消息。 当一个发布者有新消息时,就将这个消息发布到调度中心。调度中心就会将这个消息通知给所有订阅者。 这是一种松耦合的设计模式,使得系统更容易扩展和维护。 **使用异步的事件监听发布类** 为了解决事件监听器同步处理的问题,可以使用异步的事件监听发布类。这种类可以通过设置默认事件监听发布类的taskExecutor属性来设置异步执行。 **代码示例** ```java // 使用 SimpleApplicationEventMulticaster 设置异步事件监听 SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(500), new CustomizableThreadFactory("newbee—event-task"), new ThreadPoolExecutor.CallerRunsPolicy()); multicaster.setTaskExecutor(threadPoolExecutor); ``` **其他注意事项** * 异步通知可能会阻塞线程,因此需要确保线程安全。 * 您可以使用线程池来创建多个线程来处理事件监听任务。 * 您可以使用回调函数或事件通知来处理事件。

正文

一、简介

订阅发布模式(Publish-Subscribe Pattern)是一种行之有效的解耦框架与业务逻辑的方式,也是一种常见的观察者设计模式,它被广泛应用于事件驱动架构中。

在这个模式中,发布者(或者说是主题)并不直接发送消息给订阅者,而是通过调度中心(或者叫消息代理)来传递消息。 发布者(或者说是主题)并不知道订阅者的存在,而订阅者也不知道发布者的存在。他们彼此唯一的关系就是在调度中心注册成为订阅者或者发布者。

当一个发布者有新消息时,就将这个消息发布到调度中心。调度中心就会将这个消息通知给所有订阅者。这就实现了发布者和订阅者之间的解耦,发布者和订阅者不再直接依赖于彼此,他们可以独立地扩展自己。

在具体的实现中,可以通过消息队列、事件总线等机制来实现调度中心,不同语言和平台都有实现的库和框架,例如 Java 中的 ActiveMQ、RabbitMQ、Kafka等。

订阅发布模式有以下优点:

  1. 性能好,发布者发送消息后直接返回不需要等待消费者处理完毕。
  2. 解耦性较强,发布者和订阅者之间不存在直接依赖,满足高内聚低耦合的设计思想。
  3. 可以支持一对多、多对多的消息通信模型,提供了更加灵活的消息传递方式。
  4. 可以动态地增加或删除发布者和订阅者,扩展性较好。

二、Java实现发布订阅模式

  1. 创建订阅者接口,用于接受消息通知。
interface Subscriber {
    void update(String message);
}
  1. 创建发布者,用于发布消息。实现了增加、删除和发布的功能,并且维护了一个订阅列表,
class Publisher {
    private Map<String, List<Subscriber>> subscribers = new HashMap<>();

    public void subscribe(String topic, Subscriber subscriber) {
        List<Subscriber> subscriberList = subscribers.get(topic);
        if (subscriberList == null) {
            subscriberList = new ArrayList<>();
            subscribers.put(topic, subscriberList);
        }
        subscriberList.add(subscriber);
    }

    public void unsubscribe(String topic, Subscriber subscriber) {
        List<Subscriber> subscriberList = subscribers.get(topic);
        if (subscriberList != null) {
            subscriberList.remove(subscriber);
        }
    }

    public void publish(String topic, String message) {
        List<Subscriber> subscriberList = subscribers.get(topic);
        if (subscriberList != null) {
            for (Subscriber subscriber : subscriberList) {
                subscriber.update(message);
            }
        }
    }
}
  1. 我们还实现了两个不同的 Subscriber 实现,一个是 EmailSubscriber,另一个是 SMSSubscriber,用于接受发布者的消息并将其分别发送到邮箱和手机上。
class EmailSubscriber implements Subscriber {
    private String email;

    public EmailSubscriber(String email) {
        this.email = email;
    }

    public void update(String message) {
        System.out.println("Send email to " + email + ": " + message);
    }
}

class SMSSubscriber implements Subscriber {
    private String phoneNumber;

    public SMSSubscriber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public void update(String message) {
        System.out.println("Send SMS to " + phoneNumber + ": " + message);
    }
}

  1. 在 Main 类中,我们创建了一个 Publisher 对象,并添加了两个 EmailSubscriber 和两个 SMSSubscriber,分别订阅了 news 主题的更新。我们先给这个主题发送一条消息,然后取消 news 主题的其中一个订阅者,最后我们再次给 news 主题发送一条消息。
public class Main {
    public static void main(String[] args) {
        Publisher publisher = new Publisher();

        Subscriber emailSubscriber1 = new EmailSubscriber("foo@example.com");
        Subscriber smsSubscriber1 = new SMSSubscriber("1234567890");

        publisher.subscribe("news", emailSubscriber1);
        publisher.subscribe("news", smsSubscriber1);

        publisher.publish("news", "发布新消息1");
        publisher.unsubscribe("news", smsSubscriber1);
        publisher.publish("news", "发布新消息2");
    }
}

打印输出如下:

Send email to foo@example.com: 发布新消息1
Send SMS to 1234567890: 发布新消息1
Send email to foo@example.com: 发布新消息2

三、Spring中自带的订阅发布模式

Spring的订阅发布模式是通过发布事件、事件监听器和事件发布器3个部分来完成的

这里我们通过 newbee-mall-pro 项目中已经实现订阅发布模式的下单流程给大家讲解,项目地址:https://github.com/wayn111/newbee-mall-pro

  1. 自定义订单发布事件,继承 ApplicationEvent
public class OrderEvent extends ApplicationEvent {
  void onApplicationEvent(Object event) {
    ...
  }
}
  1. 定义订单监听器,实现 ApplicationListener
@Component
public class OrderListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {
    // 生成订单、删除购物车、扣减库存
    ...
    }
}
  1. 下单流程,通过事件发布器 applicationEventPublisher 发布订单事件,然后再订单监听器中处理订单保存逻辑。
@Resource
private ApplicationEventPublisher applicationEventPublisher;

private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) {
    // 订单检查
    ...
    // 生成订单号
    String orderNo = NumberUtil.genOrderNo();
    // 发布订单事件,在事件监听中处理下单逻辑
    applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList));
    // 所有操作成功后,将订单号返回
    return orderNo;
    ...
}

通过事件监听机制,我们将下单逻辑拆分成如下步骤:

  1. 订单检查
  2. 生成订单号
  3. 发布订单事件,在事件监听中处理订单保存逻辑
  4. 所有操作成功后,将订单号返回
    每个步骤都是各自独立不互相影响

如上的代码已经实现了订阅发布模式,成功解耦了下单逻辑。但是在性能上还没有得到优化,因为 Spring Boot 项目中,默认情况下事件监听器是同步处理的,也就是说这里下单流程会等待事件监听器处理完毕才返回,最终影响接口响应时长。

四、使用异步的事件监听发布类

Spring Boot 项目中事件监听发布类是由 SimpleApplicationEventMulticaster 这个类实现的,源码中通知订阅者代码如下:

可以看到,代码里是有判断 getTaskExecutor() 方法返回不为空的话,就交由 executor 执行,负责同步执行。这个时候大家就要问了,这里不是有线程池在异步通知订阅者吗?

不急,博主带大家继续查看源码。

可以看到 getTaskExecutor() 方法返回一个成员属性,这个成员属性在 SimpleApplicationEventMulticaster 类中是通过 setTaskExecutor(@Nullable Executor taskExecutor) 方法设置的。我们通过 ctrl + f7 查一下 setTaskExecutor(...) 方法在哪里被调用过,

Ok,到此水落石出,SimpleApplicationEventMulticaster 类的 taskExecutor 成员属性一直为 null,所以在通过订阅者的时候一直是同步处理,等待订阅者处理完毕。


对于异步处理,我们可以从2个方面入手:

  1. 事件监听器入手,将事件监听器的事件触发方法改为异步执行,例如将生成订单、删除购物车、扣减库存逻辑放入线程池异步执行,或者是在订阅者的通知方法 onApplicationEvent 上加上@Async注解,表示该方法异步执行。
  2. 事件监听发布类入手,设置默认事件监听发布类的taskExecutor属性,通过源码可知,也可以解决。

这里博主给大家介绍下怎么修改事件监听发布类来解决。

/**
 * 系统启动时執行
 */
@Component
public class SpringBeanStartupRunner implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 设置spring默认的事件监听为异步执行
        SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                60L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(500),
                new CustomizableThreadFactory("newbee—event-task"),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        multicaster.setTaskExecutor(threadPoolExecutor);
    }
}

在系统启动时反射修改SimpleApplicationEventMulticaster类的taskExecutor属性,从而让SimpleApplicationEventMulticaster类支持异步事件通知。

总结

建议大家在日常开发中多加思考哪些业务流程可以适用,例如微服务项目中订单支付成功后需要通知用户、商品、活动等多个服务时,可以考虑使用订阅发布模式。解耦发布者和订阅者,发布者只管发布消息,不需要知道有哪些订阅者,也不需要知道订阅者的具体实现。订阅者只需要关注自己感兴趣的消息即可。这种松耦合的设计使得系统更容易扩展和维护。

关注公众号【waynblog】每周分享技术干货、开源项目、实战经验、高效开发工具等,您的关注将是我的更新动力!

与设计模式之订阅发布模式相似的内容:

设计模式之订阅发布模式

# 一、简介 订阅发布模式(Publish-Subscribe Pattern)是一种行之有效的解耦框架与业务逻辑的方式,也是一种常见的观察者设计模式,它被广泛应用于事件驱动架构中。 在这个模式中,发布者(或者说是主题)并不直接发送消息给订阅者,而是通过调度中心(或者叫消息代理)来传递消息。 发布者

软件设计模式系列之二十一——观察者模式

观察者模式(Observer Pattern)是一种行为型设计模式,它允许对象之间建立一对多的依赖关系,当一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动更新。这个模式也被称为发布-订阅模式,因为它模拟了一个主题(发布者)与多个观察者(订阅者)之间的关系。观察者模式主要用于实现对象之间...

万字详解常用设计模式

本文是博主在工作中对常用设计模式的使用经验总结归纳而来分享给大家。 设计模式一共有23种,本文讲解涉及如下: 责任链模式 模板方法模式 发布订阅模式 策略模式 三大分类 业界一般将设计模式分为三大类: 创建型模式:对类的实例化过程进行了抽象,能够将软件模块中对象的创建和对象的使用分离。有五种创建型模

redux-saga原理

前言 工作中使用了redux-saga这个redux中间件,如果不明白内部原理使用起来会让人摸不着头脑,阅读源码后特意对其原理做下总结。 redux的特点 一个标准、管理应用副作用的redux中间件 实现切面编程方式 声明式的编写方式 订阅发布的设计模式 优点: 把异步操作转移到单独 saga文件中

设计模式:代理模式详解

需求场景 按着惯例,还是以一个应用场景作为代理模式的切入点。现在有一个订单系统,要求是:一旦订单被创建,只有订单的创建人才可以修改订单中的数据,其他人则不能修改。 基本实现思路 按着最直白的思路,就是查询数据库中订单的创建人和当前Session中的登录账号ID是否一致。 class Order {

设计模式之适配器模式(学习笔记)

定义 适配器模式是一种结构型设计模式,它允许将一个类的接口转换为客户端希望的另一个接口。适配器使得原本由于接口不兼容而不能一起工作的类可以协同工作。通过创建适配器类,可以将现有类的接口转换成目标接口,从而使这些类能够在一起工作。 为什么使用适配器模式 兼容性 适配器模式能够解决由于接口不兼容而无法直

设计模式之抽象工厂模式(学习笔记)

定义 抽象工厂模式是一种创建型设计模式,它提供一个接口,用于创建一系列相关或依赖的对象,而无需指定它们的具体类。抽象工厂模式将对象的创建过程抽象化,允许子类通过实现具体工厂类来定制对象的创建。 为什么使用抽象工厂模式 产品族的一致性 抽象工厂模式确保同一产品族中的对象之间的一致性。 部分遵循开闭原则

设计模式之工厂模式(学习笔记)

定义 工厂方法模式是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类来决定实例化哪一个类。工厂方法使得类的实例化延迟到子类,这样可以让客户端在不需要知道具体类的情况下创建对象。工厂方法模式通过使用继承和多态性,允许子类来控制对象的创建方式,能够更好地应对对象创建的复杂性和变化性。 为什么

设计模式之简单工厂模式(学习笔记)

定义 简单工厂模式(Simple Factory Pattern)是一种创建型设计模式,它定义一个用于创建对象的接口,但由一个单独的类来实现实际创建的工作。简单工厂模式通过在一个类中集中管理对象的创建过程,可以减少客户端与具体类之间的耦合,使得代码结构更加清晰和易于维护。通过专门定义一个类来负责创建

设计模式之装饰模式(学习笔记)

定义 装饰模式(Decorator Pattern),又称为包装模式,是一种结构型设计模式。它允许在不改变现有对象结构的情况下,动态地添加新的功能。通过将每个功能封装在单独的装饰器类中,并且这些装饰器类通过引用原始对象来实现功能的组合,从而提供了灵活性和可扩展性的优势。装饰模式避免了通过继承方式增加