MQ系列14:MQ如何做到消息延时处理

mq,系列,如何,做到,消息,延时,处理 · 浏览次数 : 389

小编点评

**MQ系列1:消息中间件执行原理** MQ是一个消息中间件,它可以用于将消息从一个消息队列传递到另一个消息队列中。MQ可以从多个消息队列中从一个消息队列中获取消息,并将这些消息发送到另一个消息队列中。 **MQ系列2:消息中间件的技术选型** RocketMQ 是一个用于消息中间件的技术选型,它提供以下功能: * 支持多种消息类型的支持,包括文本、字节、对象和队列。 * 可配置的延时处理机制,可以用来处理消息的延时处理。 * 支持多个消费者从一个主题上消费消息。 * 提供配置参数的选项,可以用来调整MQ的性能和可靠性。 **MQ系列3:RocketMQ 架构分析** RocketMQ 是一个基于 Java 的消息中间件框架,它使用 Java 编程语言开发。RocketMQ 的架构可以分为以下几个组件: * **消息存储器**:用于存储和发送消息。 * **消息路由器**:用于路由消息到不同的消费者。 * **消费者**:用于从消息队列中获取消息并处理它们。 **MQ系列4:NameServer 原理解析** NameServer 是一个消息中间件的管理组件,它负责配置和管理消息存储器,以及提供消息路由器的地址。NameServer 也是消息存储器和消费者之间通信的中介。 **MQ系列5:RocketMQ 消息的发送模式** 在发送消息之前,生产者可以使用以下方法设置消息的延时级别: * **`delayTimeLevel` 属性**:指定消息在发送之前等待的延迟时间。 * **`sendTimeout` 属性**:指定发送消息的最大等待时间。 **MQ系列6:消息的消费** 在消费者获取消息之前,它会检查消息的延迟级别。如果消息的延迟级别小于 `sendTimeout`,消息会被立即消费。如果消息的延迟级别大于 `sendTimeout`,消息会被延迟消费。 **MQ系列7:消息通信,追求极致性能** MQ 提供以下方法以提高消息的处理性能: * **使用多个消费者**:多个消费者可以从一个主题上并发消费消息。 * **调整消息存储器的缓存策略**:缓存可以减少消息的处理时间。 * **优化消息路由**:使用正确的路由规则可以减少消息的延迟。 **MQ系列8:数据存储,消息队列的高可用保障** MQ 支持以下数据存储方式: * **内存队列**:内存队列是一种仅存在内存中的队列。 * **文件队列**:文件队列是一种将消息写入文件中并读取文件的队列。 * **消息存储器**:消息存储器是一种提供持久化的消息存储。 **MQ系列9:高可用架构分析** MQ 使用以下技术实现高可用架构: * **容错机制**:MQ 可以容错处理消息的丢失,并重发这些消息。 * **负载均衡**:MQ 可以均衡处理消息,以减少服务器负载。 * **消息持久化**:MQ 可以持久化消息,以便在服务器故障期间仍然可用。 **MQ系列10:总结** MQ是一个功能强大的消息中间件,它可以用于各种应用程序的消息传递。RocketMQ 是一个优秀的消息中间件,它提供了多种功能和选项,可以满足许多实际应用场景的需求。

正文

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费
MQ系列11:如何保证消息可靠性传输
MQ系列12:如何保证消息顺序性
MQ系列13:消息大量堆积如何解决

1 背景

在互联网业务的实际应用场景中,消息的延时处理是非常必要的。例如,在金融交易系统中,某些交易的确认可能需要一段时间才能完成。又如,在物流跟踪系统中,货物的运输状态需要一段时间才能更新。而MQ作为中间件的角色专门来处理消息媒介,实际也具备了使用消息的延时处理来保证信息的及时性的能力。
这边举两个具体的例子:

  • 火车票订购,提交了订单就把车票给占位了,这时候可以发送一个延时确认的消息,15m 未付款,就要把该车票释放,这样其他人就可以购买了。
  • 购买电影票,可以发送一个核销检查消息,在电影开场前15分钟就无法退票了。

既然消息延迟处理的使用场景这么常见,那我们就要详细来分析下怎么使用MQ来实现,这边以RocketMQ为技术选型。

2 消息延时处理原理

RocketMQ的消息延时处理是通过预定义的消息延时级别和延时队列来实现的。在发送消息时,生产者可以设置一个延时级别,该消息将会被延迟一段时间后才能被消费者消费。RocketMQ默认提供了18个延时级别,每个级别对应不同的延迟时间。

所以延时时间并不是随意指定的,Rocket源码中指定的18种等级如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMq不支持任意时间延时,需设置固定的延时等级,从1s到2h分别对应着等级1到18
  • 可以使用setDelayTimeLevel(int level) 方法设置延时等级,level 从 0 开始

在RocketMQ中,每个Broker都设置了一个延时队列,用于存储延时消息。当消息的延时时间到达时,该消息将会被自动转移到普通的消息队列中,等待消费者的消费。这种方式可以有效地避免因为网络延迟或者消费者处理速度慢而导致消息的延迟。

image

3 消息延时处理实战

使用RocketMQ的消息延时处理非常简单。在发送消息时,生产者只需要设置一个延时级别,然后将消息发送到RocketMQ即可。例如:

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、创建生产者producer,并指定生产者组名为 example_group_name
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");  
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 3、启动生产者producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("example_topic","example_key", "试一试延迟30s发送的消息".getBytes("UTF-8"));
        // 5、设置延时等级3,对应30s,所以这个消息在30秒之后发送
        msg.setDelayTimeLevel(3);
        // 6、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 7、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 8、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

image

在上述代码中,我们首先创建了一个生产者,然后指定了NameServer的地址,并启动了生产者。接着,我们创建了一个延时级别为3的消息,即该消息将会被延迟30秒后才能发送并被消费者消费。最后,我们发送了该消息,并关闭了生产者。

4 消息延时的优化

虽然RocketMQ的消息延时处理功能已经非常强大,但是在实际应用中,我们可能还需要根据自己的业务需求进行一些优化。以下是一些可能的优化方式:

  • 调整延时队列的大小。在RocketMQ中,每个Broker都只有一个延时队列,队列太小可能导致一些延时消息被miss。可以根据实际需求调整延时队列的大小。
  • 使用多个消费者来消费同一主题的消息。在RocketMQ中,可能有批量执行被设置了同样的延迟时间,这个就存在了一些风险,类似缓存的批量过期一样,稍有不慎,可能会击穿数据库。如果只有一个消费者来消费该主题的消息,可能会导致该消费者的处理速度不够快,从而影响到消息的及时性。我们可以根据实际需求增加消费者数量,以提高消息的处理速度。
  • 调整RocketMQ的配置参数。RocketMQ提供了一些配置参数,可以用来调整其性能和可靠性。我们可以根据实际需求调整这些参数,以优化消息的延时处理效果。

总之,RocketMQ的消息延时处理功能非常强大,可以满足许多实际应用场景的需求。在实际应用中,我们可以根据自己的业务需求进行一些优化,以进一步提高消息的及时性和可靠性。

5 总结

本文我们介绍了RocketMQ如何使用消息延时来处理特殊的业务场景。除了上述的方法之外,我们还有一些其他方法,比如:

  • 定时发送消息。在定时发送中,生产者可以指定一个未来的时间戳,在该时间戳到达时,该消息将会被发送到Broker。RocketMQ内部会维护一个定时任务,每隔一段时间检查一次待定时发送的消息,并判断是否到达了指定的时间戳。如果到达了指定的时间戳,该消息将会被发送到Broker。
  • 自建环形队列来实现“延时消息” ,参考这篇:1分钟实现“延迟消息”功能,写的不错

与MQ系列14:MQ如何做到消息延时处理相似的内容:

MQ系列14:MQ如何做到消息延时处理

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列7:消息通信,追求极致性能

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 1 介绍 前面的章节我学习了 NameServer的原理,消息的生产发送,以及消息

MQ系列8:数据存储,消息队列的高可用保障

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 1 介绍 在之前的章节中,我们介绍了消息的发送

MQ系列9:高可用架构分析

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 1

MQ系列10:如何保证消息幂等性消费

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列11:如何保证消息可靠性传输(除夕奉上)

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列12:如何保证消息顺序性

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列13:消息大量堆积如何为解决

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ 消息队列 比较

为什么需要消息队列 削峰 业务系统在超高并发场景中,由于后端服务来不及同步处理过多、过快的请求,可能导致请求堵塞,严重时可能由于高负荷拖垮Web服务器。 为了能支持最高峰流量,我们通常采取短平快的方式——直接扩容服务器,增加服务端的吞吐量。 优点是显而易见的,短时间内吞吐量增加了好几倍,甚至数十倍。

MQ消息积压,把我整吐血了

前言 我之前在一家餐饮公司待过两年,每天中午和晚上用餐高峰期,系统的并发量不容小觑。 为了保险起见,公司规定各部门都要在吃饭的时间轮流值班,防止出现线上问题时能够及时处理。 我当时在后厨显示系统团队,该系统属于订单的下游业务。 用户点完菜下单后,订单系统会通过发kafka消息给我们系统,系统读取消息