Redis系列22:Redis 的Pub/Sub能力

redis,系列,pub,sub,能力 · 浏览次数 : 260

小编点评

**Redis 发布订阅功能** **使用频道(Channel)进行发布订阅** ```java RShardedTopic topic = redisson.getShardedTopic("myTopic"); long clientsReceivedMessage = topic.publish(new SomeObject()); ``` **使用模式(Pattern)进行发布订阅** ```java RShardedTopic topic = redisson.getShardedTopic("myTopic"); psubscribe("Message.Queue.*", SomeObject.class, new MessageListener<SomeObject>>() { @Override public void onMessage(String channel, SomeObject message) { // ... } }); ``` **注意事项** * 当使用Pattern进行发布订阅的时候,除了订阅该Channel的Client之外,所有订阅了与Channel匹配的模式的Client同样会收到消息。 * 另外,Redis发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,与之前介绍过的 List 与 Stream 消息队列能力是不同的。 **总结** * 使用频道进行发布订阅时,所有订阅了与Channel匹配的模式的Client都会收到消息。 * 使用模式进行发布订阅时,所有订阅了与Channel匹配的模式的Client会收到消息。 * 需要注意的是,当使用Pattern进行发布订阅的时候,除了订阅该Channel的Client之外,所有订阅了与Channel匹配的模式的Client同样会收到消息。

正文

Redis系列1:深刻理解高性能Redis的本质
Redis系列2:数据持久化提高可用性
Redis系列3:高可用之主从架构
Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列5:深入分析Cluster 集群模式
追求性能极致:Redis6.0的多线程模型
追求性能极致:客户端缓存带来的革命
Redis系列8:Bitmap实现亿万级数据计算
Redis系列9:Geo 类型赋能亿级地图位置计算
Redis系列10:HyperLogLog实现海量数据基数统计
Redis系列11:内存淘汰策略
Redis系列12:Redis 的事务机制
Redis系列13:分布式锁实现
Redis系列14:使用List实现消息队列
Redis系列15:使用Stream实现消息队列
Redis系列16:聊聊布隆过滤器(原理篇)
Redis系列17:聊聊布隆过滤器(实践篇)
Redis系列18:过期数据的删除策略
Redis系列19:LRU内存淘汰算法分析
Redis系列20:LFU内存淘汰算法分析
Redis系列21:缓存与数据库的数据一致性讨论

1 关于Redis 的 Pub/Sub

Redis的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。
以下是Redis发布订阅模式的主要组件:
发布者(Publisher):发布者是产生并发布消息的实体。它可以将消息发送到指定的频道或模式。
订阅者(Subscriber):订阅者是接收并处理消息的实体。它可以订阅一个或多个频道或模式,以便接收相关的消息。
频道(Channel):频道是发布者和订阅者之间的通信渠道。发布者将消息发送到频道,而订阅者从频道接收消息。

可以看下图,Publisher 和 Subscriber、Channel的关系很清晰:
image
发布者往 "Channel A" 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息。

2 使用Pub/Sub实现发布订阅的过程

Redis实现 发布/订阅 一共有两种模式:

  • 使用频道(Channel)进行发布订阅
  • 使用模式(Pattern)进行发布订阅

我们知道,Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织。
但是值得注意的是,这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息。

2.1 通过频道(Channel)进行发布订阅

  • 首先,Subscriber订阅某个Channel,实现对Channel的监听
  • Publisher 对 Channel 这个服务中心媒介发布消息
  • 所有订阅 Channel 的Subscriber接收到消息

接下来我们通过这几个步骤看看具体是怎么实现发布与订阅的过程的。

2.1.1 订阅者订阅频道

SUBSCRIBE命令:订阅者使用SUBSCRIBE命令订阅一个或多个频道。语法为SUBSCRIBE channel [channel ...]。
时间复杂度为O(n) ,n 为订阅的 Channel 数量。

SUBSCRIBE mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe" // Message Type
2) "mychannel" // channel
3) (integer) 1 

执行完该指令后,订阅者可以使用以下命令操作 Pub/Sub 工作:

  • subscribe:订阅
  • unsubscribe:取消订阅
  • psubscribe:订阅者使用PSUBSCRIBE命令订阅一个或多个模式
  • punsubscribe:订阅者使用PUNSUBSCRIBE命令取消订阅一个或多个模式

我们使用客户端 [subscriber A] 订阅Channel [mychannel] 来接收消息。从上面可以看出响应的信息:

  • "subscribe" :消息类型,枚举是 subscribe、message、unsubscribe
  • "mychannel" :频道的名称
  • 最后的消息内容:不同的消息类型代表不同含义。

进入订阅后的客户端可以收到 3 种枚举类型的消息:

  • subscribe:订阅成功的消息类型,第2个值是订阅成功的频道名称,第3个值是当前客户端订阅的频道数量。(参考我们上面的mychannel消息)
  • message:客户端接收消息的消息类型,第2个值表示产生消息的频道名称,第3个值是消息的内容。
  • unsubscribe:取消订阅的消息类型,第2个值是对应的频道名称,第3个值是当前客户端订阅的频道数量。值为 0 时说明客户端一个订阅的都没有了,退出订阅状态。

2.1.2 发布者发布消息

PUBLISH命令:发布者使用PUBLISH命令将消息发送到指定的频道。语法为 PUBLISH channel message

PUBLISH mychannel "Hello, World!"
(integer) 1

需要注意咱们发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了。

2.1.3 订阅者接收消息

如果想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 "Hello, World!" 这条消息。

// 第一步,订阅Channel频道
SUBSCRIBE mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe" // 订阅成功
2) "mychannel" // 频道名称
3) (integer) 1  // 订阅频道数量

// 第二步,当Publisher发布消息,Subcriber订阅到的消息如下
1) "message" // 接受到消息
2) "mychannel" // 频道名称
3) "Hello, World!" // 消息内容

2.1.4 退订频道

如果你不想收到某个频道的消息了,你可以取消预订。类似取消朋友圈关注,之后就不会收到推送了。
UNSUBSCRIBE命令:订阅者使用UNSUBSCRIBE命令取消订阅一个或多个频道。语法为 UNSUBSCRIBE channel [channel ...]

UNSUBSCRIBE mychannel

2.2 使用模式(Pattern)匹配实现发布订阅

我们来看看另一种实现发布订阅的方案 ,就是模式匹配的方式,除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的Channel,如果有,
消息也会发布到对应匹配的频道上,订阅这个Channel的客户端也会收到消息。
我们来试试看效果,订阅匹配模式如下图:
image

当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。

image

如上面图中的那样,因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。
所以中红色线条部分,包括Actor C、Actor D、Actor E 都接受到了消息。

2.2.1 订阅模式的相关语法

  • 订阅模式指令:PSUBSCRIBE
PSUBSCRIBE Message.Queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" // 消息类型:使用模式(Pattern)进行发布订阅
2) "Message.Queue.*"// 匹配的模式
3) (integer) 1 //订阅数
  • 取消模式订阅的指令:PUNSUBSCRIBE
PUNSUBSCRIBE Message.Queue.*
  • 订阅 Message.Queue.Area1 和 Message.Queue.Area2 频道的
# Message.Queue.Area1
SUBSCRIBE Message.Queue.Area1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "Message.Queue.Area1"
3) (integer) 1

# Message.Queue.Area2
SUBSCRIBE Message.Queue.Area2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "Message.Queue.Area2"
3) (integer) 1
  • Publisher向Message.Queue.Area2发布消息
# 匹配模式的订阅者收到消息
PSUBSCRIBE Message.Queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "Message.Queue.*"
3) (integer) 1
# 进入订阅状态,接收到消息
1) "pmessage"  # 消息类型
2) "Message.Queue.*" # 模式匹配
3) "Message.Queue.Area2" # 匹配的Channel
4) "Hello World!" # 具体的消息消息内容


# 对应频道的订阅者收到消息
SUBSCRIBE Message.Queue.Area2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "Message.Queue.Area2"
3) (integer) 1
# 准备接收消息
1) "message"
2) "Message.Queue.Area2"
3) "Hello World!"

因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是pmessage。

2.3 程序实现

参考官方代码:

# 消息消费代码,监听频道
RShardedTopic topic = redisson.getShardedTopic("myTopic");
int listenerId = topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

# 消息生产代码
// in other thread or JVM(发布消息与监听消息要运行在不同的 JVM,因为同一个redisonClinet,无法监听到自己的消息)
RShardedTopic topic = redisson.getShardedTopic("myTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

3 总结

Redis 实现发布订阅的功能,包括如下指令:

  • subscribe:订阅
  • unsubscribe:取消订阅
  • psubscribe:订阅者使用PSUBSCRIBE命令订阅一个或多个模式
  • punsubscribe:订阅者使用PUNSUBSCRIBE命令取消订阅一个或多个模式
  • publish channel message:向指定的频道channel发送消息message

Redis实现 发布/订阅 一共有两种模式:

  • 使用频道(Channel)进行发布订阅
  • 使用模式(Pattern)进行发布订阅

需要注意的是,当使用Pattern进行发布订阅的时候。如果有消息发布出来,除了订阅该Channel的Client之外,所有订阅了与Channel匹配的模式的Client同样会收到消息。
另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,与之前介绍过的 List 与 Stream 消息队列能力是不同的,大家注意区分,在不同的场景下合理使用。

与Redis系列22:Redis 的Pub/Sub能力相似的内容:

Redis系列22:Redis 的Pub/Sub能力

[Redis系列1:深刻理解高性能Redis的本质](https://www.cnblogs.com/wzh2010/p/15886787.html "Redis系列1:深刻理解高性能Redis的本质") [Redis系列2:数据持久化提高可用性](https://www.cnblogs.com/w

Redis 常用的数据结构简介与实例测试【Redis 系列二】

〇、都有哪些数据结构? Redis 提供了较为丰富的数据类型,常见的有五种:String(字符串),Hash(哈希),List(列表),Set(集合)、Zset(有序集合)。 随着 Redis 版本的更新,后面又支持了四种数据类型: BitMap(2.2 版新增)、HyperLogLog(2.8 版

[转帖]【Redis系列】Redis发布版本历史及特性

目录 概述Redis2.6Redis2.8Redis3.0Redis3.2Redis4.0Redis5.0Redis6.0Redis7.0 概述 Redis 使用标准版本标记进行版本控制:major.minor.patchlevel。 偶数的版本号表示稳定的版本, 例如 1.2,2.0,2.2,2.

Django性能之道:缓存应用与优化实战

title: Django性能之道:缓存应用与优化实战 date: 2024/5/11 18:34:22 updated: 2024/5/11 18:34:22 categories: 后端开发 tags: 缓存系统 Redis优点 Memcached优缺点 Django缓存 数据库优化 性能监控

redis系列02---缓存过期、穿透、击穿、雪崩

一、缓存过期 问题产生的原由: 内存空间有限,给缓存设置过期时间,但有些键值运气比较好,每次都没有被我的随机算法选中,每次都能幸免于难,这可不行,这些长时间过期的数据一直霸占着不少的内存空间! 解决方案: redis提供8种策略供应用程序选择,用于我遇到内存不足时该如何决策: * noevictio

[转帖]Redis系列(十五)、Redis6新特性之集群代理(Cluster Proxy)

在之前的文章中介绍了Redis6的集群搭建和原理,我们可以使用dummy和smart客户端连接集群,本篇介绍Redis6新增的一个功能:集群代理。客户端不需要知道集群中的具体节点个数和主从身份,可以直接通过代理访问集群,对于客户端来说通过集群代理访问的集群就和单机的Redis一样,因此也能解决很多集

[转帖]Redis系列(十六)、Redis6新特性之IO多线程

https://blog.csdn.net/wsdc0521/article/details/106766587 终于,Redis的多线程版本横空出世,大大提高了并发,本篇就带大家来看看什么是IO多线程,和我们理解的多线程有什么区别,与Memcached的多线程又有什么区别。 目录 介绍 为什么Re

[转帖]Redis系列(十七)、Redis中的内存淘汰策略和过期删除策略

我们知道Redis是分布式内存数据库,基于内存运行,可是有没有想过比较好的服务器内存也不过几百G,能存多少数据呢,当内存占用满了之后该怎么办呢?Redis的内存是否可以设置限制? 过期的key是怎么从内存中删除的?不要怕,本篇我们一起来看一下Redis的内存淘汰策略是如何释放内存的,以及过期的key

[转帖]Redis系列(十五)、Redis6新特性之集群代理(Cluster Proxy)

在之前的文章中介绍了Redis6的集群搭建和原理,我们可以使用dummy和smart客户端连接集群,本篇介绍Redis6新增的一个功能:集群代理。客户端不需要知道集群中的具体节点个数和主从身份,可以直接通过代理访问集群,对于客户端来说通过集群代理访问的集群就和单机的Redis一样,因此也能解决很多集

Redis系列8:Bitmap实现亿万级数据计算

Redis系列1:深刻理解高性能Redis的本质 Redis系列2:数据持久化提高可用性 Redis系列3:高可用之主从架构 Redis系列4:高可用之Sentinel(哨兵模式) Redis系列5:深入分析Cluster 集群模式 追求性能极致:Redis6.0的多线程模型 追求性能极致:客户端缓