一种异步延迟队列的实现方式

一种,异步,延迟,队列,实现,方式 · 浏览次数 : 312

小编点评

**数据流转及流程图** * 使用 Redis 作为消息队列。 * 设置消息队列延迟。 * 通过事件消费线程进行异步处理。 * 使用异步处理来扩展消费能力。 * 通过配置文件判断是否开启消息队列。 **示例** ```java @Resource(name = "testQueue") private IEventQueue eventQueue; @ResponseBody @GetMapping(\"/api/v1/demo\") public String demo() { log.info("发送无延迟消息"); eventQueue.push("no delay 5000 millseconds message 3"); return "ok"; } @ResponseBody @GetMapping(\"/api/v1/demo1\") public String demo1() { log.info("发送延迟5秒消息"); eventQueue.push(" delay 5000 millseconds message,name", 1000 * 5L); return "ok"; } @ResponseBody @GetMapping(\"/api/v1/demo2\") public String demo2() { log.info("发送延迟到2022-04-02 00:00:00 执行的消息"); eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000)); return "ok"; } ``` **其他** * 使用配置读取消息队列延迟。 * 通过异步处理来扩展消费能力。 * 使用 EventBus 等消息队列中间件进行消息处理。

正文

作者:京东零售 张路瑶

1.应用场景

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。

目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描3:00-3:30的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!

2.延时处理方式调研

1.DelayQueue

1.实现方式:

jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。

当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。

2.存在的问题:

1.单机运行,系统宕机后,无法进行有效的重试

2.没有执行记录和备份

3.没有重试机制

4.系统重启时,会将任务清空!

5.不能分片消费

3.优势:实现简单,无任务时阻塞,节省资源,执行时间准确

2.延迟队列mq

实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq、jmq都可以设置延迟消费时间。RabbitMq通过将消息设置过期时间,放入死信队列进行消费实现。

存在的问题:

1.时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列

优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据

缺点:

1.必须要读业务数据库,对数据库造成一定的压力,

2.存在延时

3.一次扫描数据量过大时,占用过多的系统资源。

4. 无法分片消费

优点:

1.消费失败后,下次还能继续消费,具备重试能力,

2.消费能力稳定

4.redis

任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列

优点:

1、查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高

2、redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可

3、无惧机器重启

4、分布式消费

缺点:

1.受限于redis性能,并发10W

2.多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。

5. 时间轮

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。

缺点:不适合分布式服务的使用,宕机后,会丢失任务。

3.实现目标

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。

•消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。

•Client支持丰富:支持多重语言。

•高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。

•实时性:允许存在一定的时间误差。

•支持消息删除:业务使用方,可以随时删除指定消息。

•支持消费查询

•支持手动重试

•对当前异步事件的执行增加监控

4.架构设计

5.延迟组件实现方式

1.实现原理

目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,

发送延迟任务时,会根据时间戳+机器ip+queueName+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。

通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。

监控方通过集成ump

消费记录通过redis备份+数据库持久化完成。

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。

2.消息结构

每个Job必须包含一下几个属性:

•Topic:Job类型,即QueueName

•Id:Job的唯一标识。用来检索和删除指定的Job信息。

•Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)

•Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

•traceId:发送线程的traceId,待后续pfinder支持设置traceId后,可与发送线程公用同一个traceiD,便于日志追踪

具体结构如下图表示:

TTR的设计目的是为了保证消息传输的可靠性。

3.数据流转及流程图

基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁

1.支持应用只发布,不消费,达到消息队列的功能。

2:支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。

3: 通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。

4: 支持设置超时时间配置,防止消费线程执行过久

瓶颈: 消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。

可能出现的情况: 因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。

6.demo示例

增加配置文件

判断是否开启jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>

配置

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消费代码:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}

注解形式:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开始消费:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开始消费:{}", key);
}
}

发送代码


package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("发送无延迟消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("发送延迟5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("发送延迟到2022-04-02 00:00:00执行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 

}

参考有赞设计:https://tech.youzan.com/queuing_delay/

7.目前应用:

1.云修到店排队24小时后自动取消

2..美团请求token定时刷新。

3.质保卡延期24小时生成

5. 结算单延期生成

6.短信延迟发送

与一种异步延迟队列的实现方式相似的内容:

一种异步延迟队列的实现方式

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。 目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:3

监控Kubernetes集群证书过期时间的三种方案

前言 Kubernetes 中大量用到了证书, 比如 ca证书、以及 kubelet、apiserver、proxy、etcd等组件,还有 kubeconfig 文件。 如果证书过期,轻则无法登录 Kubernetes 集群,重则整个集群异常。 为了解决证书过期的问题,一般有以下几种方式: 大幅延长

使用 Promise.withResolvers() 来简化你将函数 Promise 化的实现~~

引言 在JavaScript编程中,Promise 是一种处理异步操作的常用机制。Promise 对象代表了一个尚未完成但预期将来会完成的操作的结果。在本文中,我们将探讨如何通过使用 ES2024 的 Promise.withResolvers API 来优化我们的 Promise 实现。 现有实现

MQ消息队列篇:三大MQ产品的必备面试种子题

MQ(Message Queue)作为一种用于实现异步通信的技术,具有重要的作用和应用场景。在面试过程中,MQ相关的问题经常被问到,因此了解MQ的用途和设计原则是必不可少的。本文总结了MQ的常见面试题,包括MQ的作用、产品选型、消息不丢失的保证、消息消费的幂等性、消息顺序的保证、消息的高效读写、分布式事务的最终一致性等方面。通过深入理解这些问题,可以更好地理解MQ的应用和设计,为面试和实际应用提供参考。

如何使用io_uring构建快速响应的I/O密集型应用?

通过异步I/O操作和高效事件处理,io_uring为开发人员提供了一种强大工具,能够显著减少I/O等待时间并实现更高的吞吐量。

Generator(生成器),入门初基,Coroutine(原生协程),登峰造极,Python3.10并发异步编程async底层实现

普遍意义上讲,生成器是一种特殊的迭代器,它可以在执行过程中暂停并在恢复执行时保留它的状态。而协程,则可以让一个函数在执行过程中暂停并在恢复执行时保留它的状态,在Python3.10中,原生协程的实现手段,就是生成器,或者说的更具体一些:协程就是一种特殊的生成器,而生成器,就是协程的入门心法。 协程底

3.2 DLL注入:远程APC异步注入

APC(Asynchronous Procedure Call)异步过程调用是一种`Windows`操作系统的核心机制,它允许在进程上下文中执行用户定义的函数,而无需创建线程或等待OS执行完成。该机制适用于一些频繁的、短暂的或非常细微的操作,例如改变线程优先级或通知线程处理任务。在`APC机制`中,当某些事件发生时(例如文件IO,网络IO或定时器触发),这些事件将被操作系统添加到一个`APC队列`

C++ ASIO 实现异步套接字管理

Boost ASIO(Asynchronous I/O)是一个用于异步I/O操作的C++库,该框架提供了一种方便的方式来处理网络通信、多线程编程和异步操作。特别适用于网络应用程序的开发,从基本的网络通信到复杂的异步操作,如远程控制程序、高并发服务器等都可以使用该框架。该框架的优势在于其允许处理多个并发连接,而不必创建一个线程来管理每个连接。最重要的是ASIO是一个跨平台库,可以运行在任何支持C++

ajax面试题总结

转载请注明出处: 1.ajax异步和同步的区别 Ajax是一种基于JavaScript语言和XMLHttpRequest对象的异步数据传输技术,通过它可以使不用刷新整个页面的情况下,对页面进行部分更新。 同步和异步是指客户端发送请求时,主线程是否会阻塞等待服务器的响应返回。 同步请求在发送请求后,客

利用SpringBoot+rabbitmq 实现邮件异步发送,保证100%投递成功

在之前的文章中,我们详细介绍了 SpringBoot 整合 mail 实现各类邮件的自动推送服务。 但是这类服务通常不稳定,当出现网络异常的时候,会导致邮件推送失败。 本篇文章将介绍另一种高可靠的服务架构,实现邮件 100% 被投递成功。类似的短信自动发送等服务也大体相同。 一、先来一张流程图 本文