Redis 提供了较为丰富的数据类型,常见的有五种:String(字符串),Hash(哈希),List(列表),Set(集合)、Zset(有序集合)。
随着 Redis 版本的更新,后面又支持了四种数据类型: BitMap(2.2 版新增)、HyperLogLog(2.8 版新增)、GEO(3.2 版新增)、Stream(5.0 版新增)。
本文将对以上数据类型,通过示例进行详细介绍。
String 是最基本的 key-value 结构,key 是唯一标识,value 是具体的值。value 其实不仅是字符串, 也可以是数字(整数或浮点数),value 最多可以容纳的数据长度是 512M。
Redis 字符串是二进制安全的,这意味着他们有一个已知的长度,没有任何特殊字符终止,这个特点保证了其能够正确处理包括空字符在内的任何二进制数据。
关于二进制安全:一个二进制安全的函数或系统会将输入的数据视为原始的比特流,不会因为数据中包含某些特殊字符(如空字符'\0')而提前终止处理或改变其行为。在编程语言中,例如在 C 语言中,字符串通常以空字符'0'结尾,而非二进制安全的字符串操作可能会错误地将此字符后的字节忽略掉。这在处理包含二进制数据的文件、图片、音频等场景下会导致问题,因为这些类型的数据可能包含空字符,但它们并非数据结束的标志。
String 类型的底层的数据结构实现主要是 int 和 SDS(简单动态字符串)。SDS 的优势:不仅可以保存文本数据,还可以保存二进制数据;获取字符串长度的时间复杂度是 O(1);Redis 的 SDS API 是安全的,拼接字符串能自动扩容,不会造成缓冲区溢出。
基本操作:
# set 添加键值对语法
set key value [expiration EX seconds|PX milliseconds] [NX|XX]
# ex:秒级过期时间
# px:毫秒级过期时间
# nx:键不存在时才能设置成功
# xx:键存在时才能设置成功
[root@localhost bin]# ./redis-cli
# 【插入】key-value 值
127.0.0.1:6379> set first_key first_value
OK
# 【查询】根据 key 取值
127.0.0.1:6379> get first_key
"first_value"
# 【判断 key 是否存在】1存在 0不存在
127.0.0.1:6379> exists first_key
(integer) 1
127.0.0.1:6379> exists first_key_1
(integer) 0
# 【value 的长度】
127.0.0.1:6379> strlen first_key
(integer) 11
# 【删除】
127.0.0.1:6379> del first_key
(integer) 1
127.0.0.1:6379> get first_key
(nil)
127.0.0.1:6379>
批量操作:
# 批量操作,在原命令前加 m(multiple 多个的)
127.0.0.1:6379> mset key1 value1 key2 value2
OK
127.0.0.1:6379> mget key1 key2
1) "value1"
2) "value2"
127.0.0.1:6379>
计数器功能(字符串的内容为整数的时候才可以使用):
# 先新增一个【整型变量 number】,并赋值为 0
127.0.0.1:6379> set number 0
OK
# 计数【加 1】,命令 incr(increase:增长,增强)
127.0.0.1:6379> incr number
(integer) 1
# 【批量】计数加法,命令 incrby
127.0.0.1:6379> incrby number 5
(integer) 6
# 计数【减 1】,命令 decr(decrease:减少,降低)
127.0.0.1:6379> decr number
(integer) 5
# 【批量】计数减法,命令 decrby
127.0.0.1:6379> decrby number 2
(integer) 3
127.0.0.1:6379>
过期策略(默认永不过期):
# 添加名为 testkey 的键值对,备用
127.0.0.1:6379> set testkey testvalue
OK
# expire 关键字,【针对已存在的键值对】设置过期时间 60 秒
127.0.0.1:6379> expire testkey 60
(integer) 1
# ttl 关键字查询【过期时间值】,单位:秒
127.0.0.1:6379> ttl testkey
(integer) 51
127.0.0.1:6379> ttl testkey
(integer) 44
# 【已过期的键】,返回 -2
127.0.0.1:6379> ttl testkey
(integer) -2
# 添加键值对的同时,通过【 ex 关键字】,设置过期时间 60 秒
127.0.0.1:6379> set testkey2 testvalue2 ex 60
OK
# 在关键字【 set 后加上 ex 】创建时添加过期时间
127.0.0.1:6379> setex testkey3 60 testvalue3
OK
127.0.0.1:6379> ttl testkey3
(integer) 52
127.0.0.1:6379>
不存在就插入,存在就返回失败,并返回状态:
# setnx 关键字,不存在就插入,若【原本不存在】就返回 1,创建成功
127.0.0.1:6379> setnx testkey4 testvalue4
(integer) 1
# 若【原本已存在】,返回 0,创建失败,值也不更新
127.0.0.1:6379> setnx testkey4 testvalue4_change
(integer) 0
127.0.0.1:6379> get testkey4
"testvalue4"
# 使用 set 关键字新添加值,无论原本是否已存在,都返回同一成功状态 OK,且更新成功
127.0.0.1:6379> set testkey5 testvalue5
OK
127.0.0.1:6379> set testkey5 testvalue5_change
OK
127.0.0.1:6379> get testkey5
"testvalue5_change"
另外,也可以使用msetnx
进行批量新增操作,此操作具有原子性,要么全部成功,要么全部失败。
其他命令(追加、截取、替换、删除):
127.0.0.1:6379> get key_hw
"hello"
# append 在值的【末尾追加】值,若值中包含【空格需要用双引号】
127.0.0.1:6379> append key_hw " world"
(integer) 11
127.0.0.1:6379> get key_hw
"hello world"
# 获取【值的长度】
127.0.0.1:6379> strlen key_hw
(integer) 11
# 【 getrange 截取值】,数字标识位置,从 0 开始
127.0.0.1:6379> getrange key_hw 0 4
"hello"
# 从指定位置【 setrange 替换】
127.0.0.1:6379> setrange key_hw 5 ---
(integer) 11
127.0.0.1:6379> get key_hw
"hello---rld"
# 【 del 删除】指定键值对
127.0.0.1:6379> del key_hw
(integer) 1
127.0.0.1:6379> get key_hw
(nil)
添加 json 字符串值的两种方法:
# 给键【 user:1 】赋值一串 json 字符串
127.0.0.1:6379> set user:1 {name:zhangsan,age:1}
OK
127.0.0.1:6379> get user
(nil)
127.0.0.1:6379> get user:1
"{name:zhangsan,age:1}"
# 【user:{id}:{field}】json 传分开赋值
127.0.0.1:6379> mset user1:1:name wangwu user1:1:age 3
OK
127.0.0.1:6379> get user:1
"{name:zhangsan,age:1}"
getset 命令,先取值再直接赋值:
# getset 命令:先 get 然后再 set
# 如果设置的键不存在值,则设置值,并且返回 nil
127.0.0.1:6379> getset gstest gsvalue
(nil)
127.0.0.1:6379> get gstest
"gsvalue"
# 如果设置的键存在值,则返回该值,并设置新的值
127.0.0.1:6379> getset gstest gsvalue_new
"gsvalue"
127.0.0.1:6379> get gstest
"gsvalue_new"
计数器:每次加 1 或减 1,或者批量增加或减少。
缓存 json 对象:一字符串形式存储复杂对象。
分布式锁:针对有过期时间的键,通过 setnx 命令添加键值对,如果不成功说明锁生效中。
共享 Session 信息:将不同用户的 Session 统一存放到同一个 redis 中,避免分布式服务器 Session 隔离引起重复登录的情况。
Redis 的哈希是键值对的集合。Redis 的哈希值是字符串字段和字符串值之间的映射,因此它们被用来表示对象,还有用户信息之类的,经常变动的信息。
Hash 更适合用于对象的存储,String 更适合字符串存储,它们的区别如下图:
# 存储一个哈希集合的键值对
# 格式为:hset key field value
127.0.0.1:6379> hset key_useer name zhangsan
(integer) 1
# 获取哈希集合的一个键值
# 格式为:hget key field
127.0.0.1:6379> hget key_useer name
"zhangsan"
# 批量存储键值对
127.0.0.1:6379> hmset key_user name zhangsan age 1 address zhongguo
OK
# 批量查询
127.0.0.1:6379> hmget key_user name age address
1) "zhangsan"
2) "1"
3) "zhongguo"
# 判断哈希中是否存在指定的键
127.0.0.1:6379> hexists key_user name
(integer) 1
# 删除指定的键
127.0.0.1:6379> hdel key_user address
(integer) 1
127.0.0.1:6379> hget key_user address
(nil)
127.0.0.1:6379> hget key_user name
"zhangsan"
# 查询哈希的全部信息,以 key value 列式显示
127.0.0.1:6379> hgetall key_user
1) "name"
2) "zhangsan"
3) "age"
4) "1"
# 查看哈希的全部键
127.0.0.1:6379> hkeys key_user
1) "name"
2) "age"
# 查看哈希的全部值
127.0.0.1:6379> hvals key_user
1) "zhangsan"
2) "1"
# 查询哈希共有几个键值对
127.0.0.1:6379> hlen key_user
(integer) 2
仅支持批量增加数值:
# 哈希【仅支持批量增加】数值
# 不支持原子操作的增加,也无法减少
127.0.0.1:6379> hincrby key number 5
(integer) 15
# 其他操作均失败
127.0.0.1:6379> hincr key number
(error) ERR unknown command 'hincr', with args beginning with: 'key' 'number'
127.0.0.1:6379> hdecr key number
(error) ERR unknown command 'hdecr', with args beginning with: 'key' 'number'
127.0.0.1:6379> hdecrby key number 2
(error) ERR unknown command 'hdecrby', with args beginning with: 'key' 'number' '2'
判断不存在就插入并返回成功 1,存在就返回失败状态 0:
# 不存在就插入,返回 1,存在就返回失败 0
127.0.0.1:6379> hsetnx key number2 10
(integer) 1
127.0.0.1:6379> hsetnx key number2 5
(integer) 0
127.0.0.1:6379>
缓存对象:Hash 类型的 (key,field, value) 的结构与对象的(对象 id, 属性, 值)的结构相似,也可以用来存储对象。例如用户信息、商品信息等。
系统权限控制:在需要对用户权限进行细粒度控制的场景中,可以利用 Hash 来存储用户的权限信息,便于按需查询和更新。
配置项:对于需要频繁修改的配置信息,可以使用 Hash 来存储,这样可以单独修改某个配置项,而不需要像 String 类型那样每次都要读取和写入整个字符串。
Redis 的 List 是一个双向链表数据结构,用于存储多个有序的字符串元素。
以下是其特点:
如果链表 key 不存在,则自动创建新的链表;如果 key 存在就创建新的值。
如果移除了所有的值,空链表代表不存在。
在两边插入或者改动值,效率最高,中间元素相对来说效率会低一点。
在链表的左边、右边添加(lpush)和取出值(lpop),查询指定索引区间的值(lrange),返回链表长度(llen):
# lpush 从【左边】(left)往链表中添加值
# 以下往链表 key_list 中添加 4 个值,每次返回链表的长度
127.0.0.1:6379> lpush key_list a
(integer) 1
127.0.0.1:6379> lpush key_list b
(integer) 2
127.0.0.1:6379> lpush key_list c
(integer) 3
127.0.0.1:6379> lpush key_list d
(integer) 4
# 【lrange 查看链表中的值】,必须有序列数,位置从左边的第 0 位开始
# 语法:lrange key start end
127.0.0.1:6379> lrange key_list 0 2
1) "d"
2) "c"
3) "b"
# 【最后一位为 -1 】时,查询到链表尾
127.0.0.1:6379> lrange key_list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
# rpush 从【右边(right)】往链表中添加值
127.0.0.1:6379> rpush key_list 1
(integer) 5
127.0.0.1:6379> rpush key_list 2
(integer) 6
127.0.0.1:6379> lrange key_list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
5) "1"
6) "2"
127.0.0.1:6379> lrange key_list 1 2
1) "c"
2) "b"
# lpop 从【左边取出】指定个数的值,数量为空时,默认为取一个
# 语法:lpop key [count]
127.0.0.1:6379> lpop key_list 2
1) "d"
2) "c"
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "a"
3) "1"
4) "2"
# rpop 从【右边取出】指定个数的值,数量为空时,默认为取一个
# 语法:rpop key [count]
127.0.0.1:6379> rpop key_list 1
1) "2"
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "a"
3) "1"
# llen 返回链表的【长度】
127.0.0.1:6379> llen key_list
(integer) 3
根据索引查询值(lindex),替换指定索引下的值(lset):
# 查看当前链表中的值
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "a"
3) "1"
# 【lindex 返回指定索引的值】
# 若为【正数】,是从左边第 0 位开始计算
127.0.0.1:6379> lindex key_list 0
"b"
127.0.0.1:6379> lindex key_list 1
"a"
# 若为【负数】,则从右边开始,-1 为右边第一位
127.0.0.1:6379> lindex key_list -1
"1"
127.0.0.1:6379> lindex key_list -2
"a"
# 【lset 重新给指定索引位置赋值】,把上边的值 a,更新为 aa
# 语法:lset key indexnumber new_value
127.0.0.1:6379> lset key_list -2 aa
OK
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "aa"
3) "1"
# 若给定的【索引不存在】,则报错
# 因此,在更新值之前,最好是通过 llen 取链表的长度判断
127.0.0.1:6379> lset key_list 10 bb
(error) ERR index out of range
删除指定的值(lrem):
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "aa"
3) "1"
4) "aa"
5) "b"
# lrem 删除指定的值,索引从左边开始(无 rrem 命令)
# 语法:lrem key count value
# 若 value 有重复的多个,那么 count 就是允许删除的最大个数
127.0.0.1:6379> lrem key_list 1 aa
(integer) 1
127.0.0.1:6379> lrange key_list 0 -1
1) "b"
2) "1"
3) "aa"
4) "b"
# 允许删除 3 个 b,实际上只删除成功了 2 个,因为总共就两个
127.0.0.1:6379> lrem key_list 3 b
(integer) 2
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
2) "aa"
截取链表的一段索引值,其他的值舍弃(ltrim):
127.0.0.1:6379> lrange key_list 0 -1
1) "2"
2) "1"
3) "aa"
4) "bb"
# ltrim 截取指定索引的链表,其他的值舍弃
# 语法:ltrim list startIndex endIndex
127.0.0.1:6379> ltrim key_list 1 2
OK
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
2) "aa"
将指定链表的最后一个值,转移到新的链表(rpoplpush):
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
2) "aa"
# rpoplpush 将第一个 key 的最右边值,移动到新的 key 中(注意:无 lpoplpush 命令)
127.0.0.1:6379> rpoplpush key_list key_list_2
"aa"
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
127.0.0.1:6379> lrange key_list_2 0 -1
1) "aa"
在指定值的前边插入 value 值(linsert):
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
2) "2"
3) "3"
# linsert 在指定位置插入值
# 语法:linsert key BEFORE|AFTER pivot value
# 在值 pivot 的前边或后边插入 value 值
# 若存在重复值,则只针对左边第一个匹配的位置
127.0.0.1:6379> linsert key_list after 2 2.5
(integer) 4
127.0.0.1:6379> linsert key_list before 2 1.5
(integer) 5
127.0.0.1:6379> lrange key_list 0 -1
1) "1"
2) "1.5"
3) "2"
4) "2.5"
5) "3"
在命令 lpop、rpop 前加上字母 b,若没取到值就等待一定时间(blpop、brpop):
# 从 key 列表最左边(开头)弹出一个元素
# 若没有值就阻塞 timeout 秒,若 timeout=0 则一直阻塞
BLPOP key [key ...] timeout
# 从 key 列表最右边(结尾)弹出一个元素
# 若没有就阻塞 timeout 秒,若 timeout=0 则一直阻塞
BRPOP key [key ...] timeout
消息队列:由于 list 链表是有序的,因此可以暂存一系列消息,存取效率也很高。次序可以是先进先出(FIFO)。
保证序列:lpush 左边存入,rpop 右边取。
通过序列号保证消息唯一:例如值为,"111000102:stock:99"。
保证消息可靠性:使用备用队列。命令 brpoplpush,将每次取出的值,放到另一个新的链表中。以免处理过程中宕机,使得未完成的操作无法再次取值。
栈(lpush、lpop)功能,后进先出(LIFO)。
Set 是一个无序且元素唯一的集合数据结构。具体特点如下:
无序性:Set 类型存储的元素是无序的,即元素的排列顺序不是按照插入的顺序来存储的。
唯一性:Set 中的元素是唯一的,不允许有重复的值存在。
高效操作:由于底层使用哈希表实现,常见的添加、删除、查找操作的时间复杂度都是 O(1),这使得这些操作非常快速。
支持集合运算:Set 支持常见的集合运算,如并集、交集和差集等。
存储容量:一个 Set 最多可以存储 2^32-1 个元素,这基本上能满足大部分应用场景的需求。
应用场景:Set 常用来存储不允许重复的数据,比如用户的好友列表、关注列表或者是某个群组的成员列表等。
命令丰富:提供了丰富的操作命令,例如 sadd 用于添加元素,smembers 用于获取所有值,sismember 用于判断元素是否存在于集合中等。
Redis 的 Set 数据结构适合用来存储那些需要去重并且能进行快速集合操作的数据。
新增(sadd)和查询(scard、smembers、sismember):
# sadd 新增值
# 返回 1 则添加成功
127.0.0.1:6379> sadd key_set AA
(integer) 1
127.0.0.1:6379> sadd key_set BB
(integer) 1
127.0.0.1:6379> sadd key_set cc
(integer) 1
127.0.0.1:6379> sadd key_set dd
(integer) 1
# 若添加重复的值,返回0 添加失败
127.0.0.1:6379> sadd key_set AA
(integer) 0
# scard 查询集合长度
127.0.0.1:6379> scard key_set
(integer) 4
# smembers 查询全部值
127.0.0.1:6379> smembers key_set
1) "AA"
2) "BB"
3) "cc"
4) "dd"
# sismember 判断值是否存在
127.0.0.1:6379> sismember key_set BB
(integer) 1
127.0.0.1:6379> sismember key_set BBB
(integer) 0
删除(srem)、随机抽取 n 个元素(srandmember)、随机删除一个元素(spop):
# 【srem】删除指定元素
# 删除存在的元素成功,返回 1
127.0.0.1:6379> srem key_set AA
(integer) 1
# 删除不存在的元素,返回 0 失败
127.0.0.1:6379> srem key_set AA
(integer) 0
# 【srandmember】从集合中随机抽取 nums 个元素,非移除
# 语法:srandmember key nums
127.0.0.1:6379> srandmember key_set 2
1) "BB"
2) "cc"
# 第二次抽取结果不同
127.0.0.1:6379> srandmember key_set 2
1) "cc"
2) "dd"
# 【spop】随机取出集合中的一个元素
# 若要一次取出多个,直接在 key 后加数字即可
127.0.0.1:6379> spop key_set
"BB"
# 最后再查询一下结合剩下的元素
127.0.0.1:6379> smembers key_set
1) "cc"
2) "dd"
127.0.0.1:6379>
指定指的移动(smove)和两个集合的差集(sdiff)、交集(sinter)、并集(sunion):
# 新增两个集合 key_set1、key_set2
127.0.0.1:6379> sadd key_set1 11
(integer) 1
127.0.0.1:6379> sadd key_set1 12
(integer) 1
127.0.0.1:6379> sadd key_set1 33
(integer) 1
127.0.0.1:6379> smembers key_set1
1) "11"
2) "12"
3) "33"
127.0.0.1:6379> sadd key_set2 21
(integer) 1
127.0.0.1:6379> sadd key_set2 22
(integer) 1
127.0.0.1:6379> sadd key_set2 33
(integer) 1
127.0.0.1:6379> smembers key_set2
1) "21"
2) "22"
3) "33"
# 【smove】将 key1 中的元素 member,移到 key2 集合中
# 语法:smove key1 key2 member
127.0.0.1:6379> smove key_set1 key_set2 11
(integer) 1
# 查看移动结果,原来 key_set1 中的元素‘11’没有了
127.0.0.1:6379> smembers key_set1
1) "12"
2) "33"
# 查看移动结果,‘11’已移动到 key_set2 中
127.0.0.1:6379> smembers key_set2
1) "11"
2) "21"
3) "22"
4) "33"
# 【sdiff】用于取出 key_set1 中的,与 key_set2 不相同元素,差集
127.0.0.1:6379> sdiff key_set1 key_set2
1) "12"
# 若要取出 key_set2 中,与 key_set1 不同的元素,需要 2 在前 1 在后
127.0.0.1:6379> sdiff key_set2 key_set1
1) "11"
2) "21"
3) "22"
# 【sinter】取两个集合中相同的元素,交集
# 两个集合中都有‘33’,如下输出:
127.0.0.1:6379> sinter key_set1 key_set2
1) "33"
# 【sunion】去两个集合中的全部元素并去重,并集
127.0.0.1:6379> sunion key_set1 key_set2
1) "11"
2) "12"
3) "21"
4) "22"
5) "33"
127.0.0.1:6379>
Set 的应用场景主要是结合其无序、不可重复、集合间可进行差交并集操作三个特点展开的。
有一点需要注意,Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞。在主从集群中,为了避免主库因为 Set 做聚合计算(交集、差集、并集)时导致主库被阻塞,我们可以选择一个从库完成聚合统计,或者把数据返回给客户端,由客户端来完成聚合统计。
点赞功能:由于 Set 集合是不可重复的,因此集合中只能添加成功一次点赞用户的唯一 ID。
共同关注:可以通过sinter set1 set2
命令取两集合的交集,从两个关注列表中取出相同的值。
抽奖:就是从一个 Set 集合中取出指定个数的用户代码。若每次允许重复可以使用srandmember setkey number
来随机查询 number 个数的值。若不允许重复,则直接用spop setkey number
命令,从原始 Set 中取出指定个数的值。
Zset 类型(有序集合类型)相比于 Set 类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。
如果有序集合的元素个数小于 128 个,并且每个元素的值小于 64 字节时,Redis 会使用压缩列表(Ziplist)作为 Zset 类型的底层数据结构;当超过前述指标后,Redis 会使用跳表(Skiplist)作为 Zset 类型的底层数据结构。在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现。这一变化标志着 Redis 底层存储机制的一次重要更新,旨在提升性能和效率。
添加元素(zadd)、查询指定区间值(zrange)、正序取范围值(zrangebyscore)、倒序取范围值(zrevrange、zrevrangebyscore):
# 【zadd】添加一个或多个元素
127.0.0.1:6379> zadd key_zset 0 zero
(integer) 1
127.0.0.1:6379> zadd key_zset 1 one 2 two 3 three 4 four
(integer) 3
# 【zrange】查询指定范围的值,-1 代表查询至集合末尾
127.0.0.1:6379> zrange key_zset 0 -1
1) "zero"
2) "one"
3) "two"
4) "three"
5) "four"
127.0.0.1:6379> zrange key_zset 1 3
1) "one"
2) "two"
3) "three"
# 通过关键字 withscores,同时输出分值 scores
127.0.0.1:6379> zrange key_zset 0 -1 withscores
1) "zero"
2) "0"
3) "one"
4) "1"
5) "two"
6) "2"
7) "three"
8) "3"
9) "four"
10) "4"
# 【zrangebyscore】按次序获取指定范围的值,-inf 是最小,+inf 是最大
# 语法:zrangebyscore key min max
# 最小至最大,全部输出
127.0.0.1:6379> zrangebyscore key_zset -inf +inf withscores
1) "zero"
2) "0"
3) "one"
4) "1"
5) "two"
6) "2"
7) "three"
8) "3"
9) "four"
10) "4"
# 输出从最小至 3 之间的全部值
127.0.0.1:6379> zrangebyscore key_zset -inf 3
1) "zero"
2) "one"
3) "two"
4) "three"
# 输出从 3 至最大之间的全部值
127.0.0.1:6379> zrangebyscore key_zset 3 +inf
1) "three"
2) "four"
# 【zrangebylex】输出全部值
127.0.0.1:6379> zrangebylex key_zset - +
1) "zero"
2) "one"
3) "two"
4) "three"
5) "four"
# 【zrevrangebyscore】按倒序取指定范围值
# 语法:zrevrangebyscore key max min
127.0.0.1:6379> zrevrangebyscore key_zset 4 3
1) "four"
2) "three"
# 【zrevrange】倒序取值,开始和结束节点均从 0 开始计数
# 语法:zrevrange key start end
# 0 -1 表示返回全部
127.0.0.1:6379> zrevrange key_zset 0 -1
1) "four"
2) "three"
3) "two"
4) "one"
5) "zero"
127.0.0.1:6379> zrevrange key_zset 3 -1
1) "one"
2) "zero"
127.0.0.1:6379> zrevrange key_zset 5 -1
(empty array)
127.0.0.1:6379> zrevrange key_zset 2 3
1) "two"
2) "one"
127.0.0.1:6379>
删除单个值(zrem)、删除指定区间值(zremrangebyscore)、查询值总数(zcard)、查询指定区间内值的个数(zcount):
# 【zrem】根据实际值删除单个,不能根据 score 进行删除
127.0.0.1:6379> zrem key_zset 3
(integer) 0
127.0.0.1:6379> zrem key_zset three
(integer) 1
# 【zremrangebyscore】根据区间删除
127.0.0.1:6379> zremrangebyscore key_zset 1 2
(integer) 2
127.0.0.1:6379> zrange key_zset 0 -1
1) "zero"
2) "four"
# 【zcard】查询集合的长度
127.0.0.1:6379> zcard key_zset
(integer) 2
# 【zcount】查询指定区间内值的个数
127.0.0.1:6379> zcount key_zset 2 6
(integer) 1
127.0.0.1:6379>
给分值 score 加 1 或指定数值(zincrby):
# 【increment】为有序集合 key 中元素 member 的分值加上 increment
# 语法:zincrby key increment member
Zset 可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显示,可以优先考虑使用 Sorted Set。
有序集合比较典型的使用场景就是排行榜。例如学生成绩的排名榜、游戏积分排行榜、视频播放排名、电商系统中商品的销量排名等。
如下示例,文章阅读量的新增和排序取值:
# 添加五篇文章和各篇阅读量
127.0.0.1:6379> zadd user:zhangsan:ranking 200 arcticle:1
(integer) 1
127.0.0.1:6379> zadd user:zhangsan:ranking 50 arcticle:2
(integer) 1
127.0.0.1:6379> zadd user:zhangsan:ranking 100 arcticle:3
(integer) 1
127.0.0.1:6379> zadd user:zhangsan:ranking 150 arcticle:4
(integer) 1
127.0.0.1:6379> zadd user:zhangsan:ranking 300 arcticle:5
(integer) 1
# 【zincrby】给指定的文章,新增阅读量
# 语法:zincrby key add_number value
# 添加 1 个阅读量
127.0.0.1:6379> zincrby user:zhangsan:ranking 1 arcticle:2
"51"
# 添加 4 个阅读量
127.0.0.1:6379> zincrby user:zhangsan:ranking 4 arcticle:2
"55"
# 【zscore】查看置顶文章的阅读量
127.0.0.1:6379> zscore user:zhangsan:ranking arcticle:2
"55"
# 【zrevrange】倒序查询前三个值
127.0.0.1:6379> zrevrange user:zhangsan:ranking 0 2 withscores
1) "arcticle:5"
2) "300"
3) "arcticle:1"
4) "200"
5) "arcticle:4"
6) "150"
# 【zrangebyscore】顺序查询 100~200 阅读量的文章
127.0.0.1:6379> zrangebyscore user:zhangsan:ranking 100 200 withscores
1) "arcticle:3"
2) "100"
3) "arcticle:4"
4) "150"
5) "arcticle:1"
6) "200"
127.0.0.1:6379>
注意:前提是电话号码数据的分值,必须相同。
# 添加示例数据
127.0.0.1:6379> zadd phone 0 13100111100 0 13110114300 0 13132110901
(integer) 3
127.0.0.1:6379> zadd phone 0 13200111100 0 13210414300 0 13252110901
(integer) 3
127.0.0.1:6379> zadd phone 0 13300111100 0 13310414300 0 13352110901
(integer) 3
127.0.0.1:6379> zadd phone 0 13400111100 0 13410414300 0 13452110901
(integer) 3
# 【zrangebulex】查询全部手机号
127.0.0.1:6379> zrangebylex phone - +
1) "13100111100"
2) "13110114300"
3) "13132110901"
4) "13200111100"
5) "13210414300"
6) "13252110901"
7) "13300111100"
8) "13310414300"
9) "13352110901"
10) "13400111100"
11) "13410414300"
12) "13452110901"
# 【zrangebylex】查询 <=132 同时 <133
127.0.0.1:6379> zrangebylex phone [132 (133
1) "13200111100"
2) "13210414300"
3) "13252110901"
# 【zrangebylex】查询 <=132 同时 <134
127.0.0.1:6379> zrangebylex phone [132 (134
1) "13200111100"
2) "13210414300"
3) "13252110901"
4) "13300111100"
5) "13310414300"
6) "13352110901"
127.0.0.1:6379> zadd names 0 aaa 0 bbb 0 ccc 0 ddd 0 eee 0 fff
(integer) 6
# 【zrangebylex】查询小写字母 b 和 c 开头的字符串
# [ 和 ( 效果相同
127.0.0.1:6379> zrangebylex names [b (d
1) "bbb"
2) "ccc"
127.0.0.1:6379> zrangebylex names [b [d
1) "bbb"
2) "ccc"
127.0.0.1:6379> zadd names 0 AAA 0 BBB 0 CCC 0 DDD 0 EEE
(integer) 5
# 不允许大小写混合
127.0.0.1:6379> zrangebylex names [b [D
(empty array)
127.0.0.1:6379> zrangebylex names [B (E
1) "BBB"
2) "CCC"
3) "DDD"
127.0.0.1:6379>
其他数据类型有四种:BitMap(2.2 版新增)、HyperLogLog(2.8 版新增)、GEO(3.2 版新增)、Stream(5.0 版新增)。下面将进行详细介绍。
Bitmap,即位图,是一串连续的二进制数组(0 和 1),可以通过偏移量(offset)定位元素。BitMap 通过最小的单位 bit 来进行 0|1 的设置,表示某个元素的值或者状态,时间复杂度为 O(1)。由于 bit 是计算机中最小的单位,使用它进行储存将非常节省空间,特别适合一些数据量大且使用二值统计的场景。
添加值(setbit)、查询值(getbit)、查询1出现的次数(bitcount):
# 【setbit】添加值,01011100 11011111 11
# 语法:setbit key offset value
# offset:要设置的二进制位的偏移量,从 0 开始计数
127.0.0.1:6379> setbit key_bitmap 0 0
(integer) 0
127.0.0.1:6379> setbit key_bitmap 1 1
(integer) 0
# 省略 。。。
127.0.0.1:6379> setbit key_bitmap 16 1
(integer) 0
127.0.0.1:6379> setbit key_bitmap 17 1
(integer) 0
# 【getbit】根据偏移量,查询指定位置的值
# 语法:getbit key offset
127.0.0.1:6379> getbit key_bitmap 1
(integer) 1
127.0.0.1:6379> getbit key_bitmap 6
(integer) 0
# 【bitcount】查询全部值为 1 的次数
127.0.0.1:6379> bitcount key_bitmap
(integer) 13
# 【bitcount】查询第 0 位值为 1 的次数
# 注意:每一位包含 8 个字符
# 0 0:指的是 第 0 至 7 位
127.0.0.1:6379> bitcount key_bitmap 0 0
(integer) 4
# 1 1:指的是 第 8 至 15 位
127.0.0.1:6379> bitcount key_bitmap 1 1
(integer) 7
127.0.0.1:6379> bitcount key_bitmap 2 2
(integer) 2
# 1 2:指的是第 8 至 23 位
127.0.0.1:6379> bitcount key_bitmap 1 2
(integer) 9
四种运算操作(bitop)、查询 1 或 0 首次出现(bitpos):
# 添加值:
# key01:110
# key02:011
127.0.0.1:6379> setbit key01 0 1
(integer) 0
127.0.0.1:6379> setbit key01 1 1
(integer) 0
127.0.0.1:6379> setbit key01 2 0
(integer) 0
127.0.0.1:6379> setbit key02 0 0
(integer) 0
127.0.0.1:6379> setbit key02 1 1
(integer) 0
127.0.0.1:6379> setbit key02 2 1
(integer) 0
# 【bitop】and 与运算结果:0100 0000
127.0.0.1:6379> bitop and key03 key01 key02
(integer) 1
127.0.0.1:6379> bitcount key03
(integer) 1
127.0.0.1:6379> bitcount key01
(integer) 2
127.0.0.1:6379> bitcount key02
(integer) 2
# 【bitop】or 或运算结果:1110 0000
127.0.0.1:6379> bitop or key04 key01 key02
(integer) 1
127.0.0.1:6379> bitcount key04
(integer) 3
# 【bitop】xor 异运算结果:1010 0000
127.0.0.1:6379> bitop xor key05 key01 key02
(integer) 1
127.0.0.1:6379> bitcount key05
(integer) 2
# 【bitop】not 取反运算结果:0011 1111
127.0.0.1:6379> bitop not key06 key01
(integer) 1
127.0.0.1:6379> bitcount key06
(integer) 6
# 【bitpos】查询首次出现 1 或 0 的索引位置,从 0 开始计数
# key02:011
127.0.0.1:6379> bitpos key02 1
(integer) 1
127.0.0.1:6379> bitpos key02 0
(integer) 0
127.0.0.1:6379>
Bitmap 类型非常适合二值状态统计的场景,这里的二值状态就是指集合元素的取值就只有 0 和 1 两种,在记录海量数据时,Bitmap 能够有效地节省内存空间。
在签到打卡的场景中,每天只需签一次,即签到(1)或未签到(0),这就是非常典型的二值状态。签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月的签到情况用约 30 个 bit 位就可以,而一年的签到也只需要用约 365 个 bit 位,根本无需用复杂的集合类型。
# 用户 ID 为 1001
# 202406 六月份
# 0:第一天打卡
# 打卡结果为 1
127.0.0.1:6379> setbit uid:sign:1001:202406 0 1
# 1:第二天,未打卡就不在添加值,默认值为 0
127.0.0.1:6379>
# 2:第三天次打开
# 打卡结果为 1
127.0.0.1:6379> setbit uid:sign:1001:202406 2 1
# 通过 getbit 查询六月第二天的打卡结果
127.0.0.1:6379> getbit uid:sign:1001:202406 1
(integer) 1
# 通过 bitcount 查询六月打卡次数
127.0.0.1:6379> bitcount uid:sign:1001:202406
注意 offset 是从 0 开始的,因此查询时需要实际日期数字减 1。
只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。通过 GETBIT判断对应的用户是否在线。 5000 万用户只需要 6 MB 的空间。
# 新增三个工号 1001、1002、1003 在两个日期 0601、0602 的打卡结果
# 1001 两天均为 1
127.0.0.1:6379> setbit 0601 1001 1
(integer) 0
127.0.0.1:6379> setbit 0602 1001 1
(integer) 0
# 1002 成功一天、失败一天
127.0.0.1:6379> setbit 0601 1002 1
(integer) 0
127.0.0.1:6379> setbit 0602 1002 0
(integer) 0
# 1003 两天均为 1
127.0.0.1:6379> setbit 0601 1003 1
(integer) 0
127.0.0.1:6379> setbit 0602 1003 1
(integer) 0
# 对两天的打卡结果进行与运算
127.0.0.1:6379> bitop and 1001result 0601 0602
(integer) 126
# 取出两天连续打卡成功的人数
127.0.0.1:6379> bitcount 1001result
(integer) 2
127.0.0.1:6379>
HyperLogLog 是 Redis 2.8.9 版本新增的数据类型,是一种用于统计基数的数据集合类型。基数统计就是指,统计一个集合中不重复的元素个数。但要 HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%。所以,HyperLogLog 提供的去重计数不十分精确。
优点:在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。
所谓节省,下面是一个例子。对于 Java 语言来说,一般 long 类型占用 8 字节,而 1 字节有 8 位,即:1 byte = 8 bit,即 long 数据类型最大可以表示的数是:2^63-1。对应上面的2^64个数,假设此时有2^63-1这么多个数,从 0 ~ 2^63-1,按照 long 以及 1k = 1024 字节的规则来计算内存总数,就是:((2^63-1) * 8/1024)K,这是很庞大的一个数,存储空间远远超过 12K,而 HyperLogLog 却可以用 12K 就能统计完。
常用命令共有三个,如下示例:
# 【pfadd】添加元素到集合
127.0.0.1:6379> pfadd key_hyperloglog v1 v2 v1 v3
(integer) 1
# 【pfcount】查询集合中基数的个数
127.0.0.1:6379> pfcount key_hyperloglog
(integer) 3
127.0.0.1:6379> pfadd key_hyperloglog2 v1 v2 v4 v5
(integer) 1
# 【pfmerge】合并两个或多个集合
# 语法:pfmerge destkey sourcekey [sourcekey ...]
# destkey:目标集合;sourcekey:源集合,可以有多个
# 如下,将 2 集合,合并到 key_hyperloglog
127.0.0.1:6379> pfmerge key_hyperloglog key_hyperloglog2
OK
# 合并后的集合 v1 v2 v1 v3 v1 v2 v4 v5,共有 5 个不同的值
127.0.0.1:6379> pfcount key_hyperloglog
(integer) 5
# 若元素为整数,需要加上引号 "",表示字符串
127.0.0.1:6379> pfadd key_hyperloglog3 "1" "2" "1" "3"
(integer) 1
127.0.0.1:6379> pfcount key_hyperloglog3
(integer) 3
127.0.0.1:6379>
HyperLogLog 优势在于非常节省空间,所以,非常适合统计百万级以上的网页 UV 的场景。
在统计 UV 时,可以如下操作:
# 用 pfadd 命令,把访问页面的每个用户都添加到 HyperLogLog 集合中
127.0.0.1:6379> pfadd page1:uv user1 user2 user3
(integer) 1
# 用 pfcount 命令,返回统计结果
127.0.0.1:6379> pfcount page1:uv
(integer) 3
127.0.0.1:6379>
但需要注意的是,HyperLogLog 有个标准误算率 0.81%,因此,若统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果在需要精确统计结果的环境中,最好还是继续用 Set 或 Hash 类型。
Redis GEO 是 Redis 3.2 版本新增的数据类型,主要用于存储地理位置信息,并对存储的信息进行操作。
在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。
GEO 本身并没有设计新的底层数据结构,而是直接使用了 zset(Sorted Set)集合类型。
GEO 类型使用 GeoHash 编码方法将地球表面网格化,并把经纬度转换为简洁的字符串,例如“wm6nj2”,实现了经纬度到 Sorted Set 中元素权重分数的转换,这其中的两个关键机制就是对二维地图做区间划分和对区间进行编码。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为 Sorted Set 元素的权重分数。有了权重后,就可以把经纬度保存到 zset 中,利用 zset 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。
共有四个常用命令,示例如下:(geoadd:添加;geopos:查询;geodist:查询距离;georadius:查询范围内的点)
# 【geoadd】添加指定地理空间位置,可一次性添加多个,返回为添加成功的个数
# 语法:geoadd key longitude latitude member [longitude latitude member ...]
127.0.0.1:6379> geoadd key_geo 116 39 car001 126 41 car002
(integer) 2
# 【geopos】从 key 中查询指定名称的位置信息,一次可查询多个
# 语法:geopos key member [member ...]
127.0.0.1:6379> geopos key_geo car001
1) 1) "116.00000113248825073"
2) "38.99999918434559731"
127.0.0.1:6379> geopos key_geo car001 car002
1) 1) "116.00000113248825073"
2) "38.99999918434559731"
2) 1) "126.00000053644180298"
2) "41.00000063735271993"
# 【geodist】查询同一个 key 中两个位置间距,可以选择距离的单位,默认 m
# 语法:GEODIST key member1 member2 [m|km|ft|mi]
127.0.0.1:6379> geodist key_geo car001 car002
"880039.9467"
127.0.0.1:6379> geodist key_geo car001 car002 m
"880039.9467"
127.0.0.1:6379> geodist key_geo car001 car002 km
"880.0399"
# 【georadius】查询 key 中距离给定位置一定距离范围内的位置点
# 语法:GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
# radius:距离范围值
127.0.0.1:6379> georadius key_geo 121 40 100 km asc count 2
(empty array)
127.0.0.1:6379> georadius key_geo 121 40 1000 km asc count 2
1) "car002"
2) "car001"
# withdist 配置,同时返回与目标点的距离,单位同为 km
127.0.0.1:6379> georadius key_geo 121 40 1000 km withdist asc count 2
1) 1) "car002"
2) "437.1971"
2) 1) "car001"
2) "443.2356"
127.0.0.1:6379>
同上一节中的常用命令示例,司机上线后,司机端通过 geoadd 添加车辆位置,用户端通过 georadius 进行查询指定范围内的可用车辆。
# 添加 carid:001 车辆,到位置组 cars:locations 中
geoadd cars:locations 116.034579 39.030452 carid:001
# 查询附近 5km 内,根据距离从 asc 近到远排序的 10 辆可用车
georadius cars:locations 116.054579 39.030452 5 km asc count 10
Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。
在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:发布订阅模式,不能持久化,对于离线重连的客户端不能读取历史消息;通过 List 实现的消息队列,一个消息消费完就会被删除,无法重复取;生产者需要自行实现全局唯一 ID。
基于以上问题,Redis 5.0 便推出了 Stream 类型,也是此版本最重要的功能,用于完美地实现消息队列。它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。
新增和查询:
# 【xadd】往队列里边添加值
# 语法:xadd key *|id field value [fielf value ...]
# 若 ID 指定为 * 号,就是取默认 ID,格式:<millisecondsTime>-<sequenceNumber>
127.0.0.1:6379> xadd key_stream * field1 value1
"1717558109464-0"
127.0.0.1:6379> xadd key_stream * field2 value2
"1717558124282-0"
127.0.0.1:6379> xadd key_stream * field3 value3
"1717558151487-0"
# 【xlen】查询队列中值的个数
127.0.0.1:6379> xlen key_stream
(integer) 3
# 【xrange】查询指定范围内的全部值,可指定返回数量
# 语法:xrange key start end [COUNT count]
127.0.0.1:6379> xrange key_stream - +
1) 1) "1717558109464-0"
2) 1) "field1"
2) "value1"
2) 1) "1717558124282-0"
2) 1) "field2"
2) "value2"
3) 1) "1717558151487-0"
2) 1) "field3"
2) "value3"
127.0.0.1:6379> xrange key_stream 1717558124282-0 1717558124282-0
1) 1) "1717558124282-0"
2) 1) "field2"
2) "value2"
127.0.0.1:6379> xrange key_stream 1717558124282-0 1717558151487-0
1) 1) "1717558124282-0"
2) 1) "field2"
2) "value2"
2) 1) "1717558151487-0"
2) 1) "field3"
2) "value3"
# 【xrevrange】相较于 xrange,此为倒序查询,开始和结束节点位置相反
# 语法:xrevrange key end start [COUNT count]
127.0.0.1:6379> xrevrange key_stream + -
1) 1) "1717558151487-0"
2) 1) "field3"
2) "value3"
2) 1) "1717558124282-0"
2) 1) "field2"
2) "value2"
3) 1) "1717558109464-0"
2) 1) "field1"
2) "value1"
# 【xread】自动读取最新的值
# 语法:xread [COUNT count] [block milliseconds] streams key [key ...] id [id ...]
# COUNT:个数;
# block:表示没有取到时要等待的时间,默认不等待,0 就是一直等待;
# streams:可以配置多个 stream,同时监听
# id:表示要取大于此 ID 的值,$ 代表 stream 中最新的 ID
127.0.0.1:6379> xread count 1 streams key_stream $
(nil)
# 在另一个窗口通过 xadd 添加 field4,此处就会自动获取到
127.0.0.1:6379> xread count 1 block 0 streams key_stream $
1) 1) "key_stream"
2) 1) 1) "1717566462395-0"
2) 1) "field4"
2) "value4"
(35.09s)
消费组管理:
# 【xgroup】管理消费组
# create 创建消费组,$ 表示最新的一个 ID
# 语法:xgroup create stream_key group_name id|$
127.0.0.1:6379> xgroup create key_stream group1 $
OK
# 【xinfo groups】查看消费组的详情
127.0.0.1:6379> xinfo groups key_stream
1) 1) "name"
2) "group1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1717566462395-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
# 【setid】重新设置消费组的起始 ID
127.0.0.1:6379> xgroup setid key_stream group1 1717558124282-0
OK
127.0.0.1:6379> xinfo groups key_stream
1) 1) "name"
2) "group1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1717558124282-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (nil)
# 【xreadgroup】读取消费组中的消息
# 语法:XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...
127.0.0.1:6379> xreadgroup group group1 consumer1 count 2 streams key_stream >
1) 1) "key_stream"
2) 1) 1) "1717558151487-0"
2) 1) "field3"
2) "value3"
2) 1) "1717566462395-0"
2) 1) "field4"
2) "value4"
127.0.0.1:6379> xinfo groups key_stream
1) 1) "name"
2) "group1"
3) "consumers"
4) (integer) 1 # 有一个消费者
5) "pending"
6) (integer) 2 # 有两条正在等待处理的消息
7) "last-delivered-id"
8) "1717566462395-0"
9) "entries-read"
10) (integer) 4
11) "lag"
12) (integer) 0
# 【xpending】获取指定流的指定消费者组目前的待处理消息的相关信息
# 语法:XPENDING stream group [start stop count] [consumer]
127.0.0.1:6379> xpending key_stream group1
1) (integer) 2 # 待处理消息的数量
2) "1717558151487-0" # 收条消息 ID
3) "1717566462395-0" # 最后一条消息 ID
4) 1) 1) "consumer1" # 个消费者正在处理的消息数量
2) "2"
# 【xack】处理消费组中的消息
# 语法:XACK stream group id [id id ...]
127.0.0.1:6379> xack key_stream group1 1717558151487-0
(integer) 1
# 处理一个后,消费组中还剩一个消息
127.0.0.1:6379> xpending key_stream group1
1) (integer) 1
2) "1717566462395-0"
3) "1717566462395-0"
4) 1) 1) "consumer1"
2) "1"
# 【xclaim】消息转移
# 语法:XCLAIM stream group new_consumer max_pending_time id [id id id]
# 下边语句意义:如果消息 ID 1717566462395-0 处理时间超过 1000ms,那么消息转移给 consumer2
127.0.0.1:6379> xclaim key_stream group1 consumer2 1000 1717566462395-0
1) 1) "1717566462395-0"
2) 1) "field4"
2) "value4"
127.0.0.1:6379> xpending key_stream group1
1) (integer) 1
2) "1717566462395-0"
3) "1717566462395-0"
4) 1) 1) "consumer2" # 将 consumer1 的一条消息转移到 consumer2 后,1 中就没有消息了,2 中有一条
2) "1"
# 【xinfo stream】查看消息队列详细信息
127.0.0.1:6379> xinfo stream key_stream
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1717566462395-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 4
13) "recorded-first-entry-id"
14) "1717558109464-0"
15) "groups"
16) (integer) 1
17) "first-entry"
18) 1) "1717558109464-0"
2) 1) "field1"
2) "value1"
19) "last-entry"
20) 1) "1717566462395-0"
2) 1) "field4"
2) "value4"
# 【xinfo consumers】查看消费组详情
127.0.0.1:6379> xinfo consumers key_stream group1
1) 1) "name"
2) "consumer1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 2046185
7) "inactive"
8) (integer) 2046185
2) 1) "name"
2) "consumer2"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 350418
7) "inactive"
8) (integer) 350418
# 【xgroup delconsumer】删除消费组
127.0.0.1:6379> xgroup delconsumer key_stream group1 consumer2
(integer) 1
# 【xgroup destroy】删除消息组
127.0.0.1:6379> xgroup destroy key_stream group1
(integer) 1
127.0.0.1:6379>
参考:https://xie.infoq.cn/article/30451af074e57a8ef31bc299d
以下是简单的步骤,命令详情可参考上一章节。
生产者通过 xadd 命令插入一条消息,插入成功后会返回全局唯一的 ID,例如:"1654254953808-0"。
消费者通过 xread 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入ID的消息)。如果想要实现阻塞读(当没有数据时,阻塞等待),可以使用 xread 时设定 block 配置项,具体值为等待时间,0 表示一直等待。
可以使用 xgroup 创建消费组,创建消费组之后,可以使用 xreadgroup 命令让消费组内的消费者读取消息。消息只能读取一次,读取后就移除了。同一条消息可以保存在多个消息组,此时就可以进行重复处理。如果消费者没有成功处理消息,它就不会给 Streams 发送 xack 命令,消息仍然会留存。此时,消费者可以在重启后,用 xpending 命令查看已读取、但尚未确认处理完成的消息。
Redis Stream 消息会丢失吗?
使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
生产者:会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到(MQ 中间件)的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
消费者:不会丢,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
Redis 消息中间件:会丢消息,Redis 在以下 2 个场景下,都会导致数据丢失:AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能;主从复制也是异步的,主从切换时,也存在丢失数据的可能。
综上,Redis 在队列中间件环节无法保证消息不丢。像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写多个节点,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
Redis Stream 消息可堆积吗?
Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
像 Kafka、RabbitMQ 等专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。
因此,把 Redis 当作队列来使用时,会面临的 2 个问题:Redis 本身可能会丢数据;面对消息挤压,内存资源会紧张。
所以,能不能将 Redis 作为消息队列来使用,关键看业务场景。如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么就选用专业的消息队列中间件。
五种基本数据类型和各自的应用场景:
String:缓存对象、常规计数、分布式锁、共享session信息等;
List:消息队列(有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)等;
Hash:缓存对象、购物车等;
Set:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等;
Zset:排序场景,比如排行榜、电话和姓名排序等。
后续版本又支持四种数据类型,它们的应用场景如下:
BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息ID,支持以消费组形式消费数据。
针对 Redis Stream 是否适合做消息队列,关键看业务场景:
如果当前业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么就需要专业的消息队列中间件了。
参考:https://xiaolincoding.com/redis/data_struct/command.html https://blog.csdn.net/weixin_43246215/article/details/108041739