Redis 高阶应用

redis · 浏览次数 : 23

小编点评

本文介绍了如何使用Redis实现全局唯一ID生成、分布式锁以及简单消息队列等功能。通过对Redis的数据结构和操作命令的应用,我们可以充分利用Redis的高性能、原子性、可重入等特点来解决分布式系统中的各种需求。 **1. 全局唯一ID生成** 全球唯一ID生成需要满足唯一性、高可用、高性能和安全性的要求。Redis提供了 incr 命令可以保证递增性,配合分布式ID生成算法可以满足这些要求。例如,使用Redis的分布式锁可以确保同一时间只有一个线程能够访问全局ID生成器,从而避免了并发问题。 **2. 分布式锁** 分布式锁是解决多进程/多线程环境下线程安全问题的关键。Redis可以通过 setnx 命令实现分布式锁,同时利用 Lua 脚本解决线程不安全的问题,例如不可重入、不可重试和超时释放等问题。 **3. 简单消息队列** Redis提供了三种不同的方式来实现消息队列:list 结构、PubSub 和 Stream。其中,Stream 结构可以实现一个功能非常完善的消息队列,包括消息的发布、订阅、消息消费等功能。通过使用 Redis Stream,我们可以轻松构建一个简单的消息队列系统。 综上所述,Redis 在解决分布式系统的各种需求方面具有广泛的应用价值。无论是全局唯一ID生成、分布式锁还是简单消息队列,Redis 都能提供高效、可靠的解决方案。

正文

生成全局唯一 ID

  • 全局唯一 ID 需要满足以下要求:

  • 唯一性:在分布式环境中,要全局唯一

  • 高可用:在高并发情况下保证可用性

  • 高性能:在高并发情况下生成 ID 的速度必须要快,不能花费太长时间

  • 递增性:要确保整体递增的,以便于数据库创建索引

  • 安全性:ID 的规律性不能太明显,以免信息泄露

从上面的要求可以看出,全局 ID 生成器的条件还是比较苛刻的,而 Redis 恰巧可以满足以上要求。

Redis 本身就是就是以性能著称,因此完全符合高性能的要求,其次使用 Redis 的 incr 命令可以保证递增性,配合相应的分布式 ID 生成算法便可以实现唯一性和安全性,Redis 可以通过哨兵、主从等集群方案来保证可用性。因此 Redis 是一个不错的选择。
下面我们就写一个简单的示例,来让大家感受一下,实际工作中大家可以根据需要进行调整:

@Component
public class IDUtil{
	//开始时间戳(单位:秒) 2000-01-01 00:00:00
	private static final long START_TIMESTAMP = 946656000L;
	//Spring Data Redis 提供的 Redis 操作模板
	@Resource
	private StringRedisTemplate stringRedisTemplate;
	/**
	 * 获取 ID   格式:时间戳+序列号
	 * @param keyPrefix Redis 序列号前缀
	 * @return 生成的 ID
	 */
	public long getNextId(String keyPrefix){
		//获取当前时间戳
		LocalDateTime now = LocalDateTime.now();
		long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
		//获取 ID 时间戳
		long timestamp = nowSecond - START_TIMESTAMP;
		//获取当前日期
		String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
		//生成 key
		String key = "incr:" + keyPrefix + ":" + date;
		//获取序列号
		long count = stringRedisTemplate.opsForValue().increment(key);
		//生成 ID 并返回
		return timestamp << 32 | count;
	}
}

分布式锁

在 JVM 内部会有一个锁监视器来控制线程间的互斥,但在分布式的环境下会有多台机器部署同样的服务,也就是说每台机器都会有自己的锁监视器。而 JVM 的锁监视器只能保证自己内部线程的安全执行,并不能保证不同机器间的线程安全执行,因此也很难避免高并发带来的线程安全问题。因此就需要分布式锁来保证整个集群的线程的安全,而分布式锁需要满足 5 点要求:多进程可见、互斥性、高可用、高性能、安全性
其中核心要求就是多进程之间互斥,而满足这一点的方式有很多,最常见的有三种:mysql、Redis、Zookeeper。

image

通过对比我们发现,其中 Redis 的效果最理想,所以下面就用 Redis 来实现一个简单的分布式锁。

public class DistributedLockUtil {
	//分布式锁前缀
	private static final String KEY_PREFIX = "distributed:lock:";
	//业务名
	private String business;
	//分布式锁的值
	private String value;
	//Spring Data Redis 提供的 Redis 操作模板
	private StringRedisTemplate stringRedisTemplate;
	//私有化无参构造
	private DistributedLockUtil(){}
	//有参构造
	public DistributedLockUtil(String business,StringRedisTemplate stringRedisTemplate){
		this.business = business;
		this.stringRedisTemplate = stringRedisTemplate;
		this.value = UUID.randomUUID().toString();
	}
	/**
	 * 尝试获取锁
	 * @param timeout 超时时间(单位:秒)
	 * @return 锁是否获取成功
	 */
	public boolean tryLock(long timeout){
		//生成分布式锁的 key
		StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
		keyBuffer.append(business);
		Boolean success = stringRedisTemplate.opsForValue().setIsAbsent(keyBuffer.toString(),value,timeout, TimeUnit.SECONDS);
		//返回结果  注意:为了防止自动拆箱时出现空指针,所以这里用了 equals 判断
		return Boolean.TRUE.equals(success);
	}
	/**
	 * 释放锁(不安全版)
	 */
	public void unLock(){
		//生成分布式锁的 key
		StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
		keyBuffer.append(business);
		//获取分布式锁的值
		String redisValue = stringRedisTemplate.opsForValue().get(keyBuffer.toString());
		//判断值是否一致,防止误删
		if (value.equals(redisValue)) {
			//当代码执行到这里时,如果 JVM 恰巧执行了垃圾回收(虽然几率极低),就会导致所有线程阻塞等待,因此这里仍然会有线程安全的问题
			stringRedisTemplate.delete(keyBuffer.toString());
		}
	}
	/**
	 * 通过脚本释放锁(彻底解决线程安全问题)
	 */
	public void unLockWithScript(){
		//加载 lua 脚本,实际工作中我们可以将脚本设置为常量,并在静态代码块中初始化(脚本内容在下文)
		DefaultRedisScript<Long> script = new DefaultRedisScript<>();
		script.setLocation(new ClassPathResource("unlock.lua"));
		script.setResultType(Long.class);
		//生成分布式锁的 key
		StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
		keyBuffer.append(business);
		//调用 lua 脚本释放锁
		stringRedisTemplate.execute(script,
				Collections.singletonList(keyBuffer.toString()),
				value);
	}
}

lua 脚本内容如下:

-- 判断值是否一致,防止误删
if(redis.call('get',KEYS[1]) == VRGV[1]) then
	-- 判断通过,释放锁
	return redis.call('del',KEYS[1])
end
-- 判断不通过,返回 0
return 0

虽然通过 lua 脚本解决了线程不安全的问题,但是仍然存在以下问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只能尝试一次,失败就返回 false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主机宕机时,如果从机还没来得及同步主机的锁数据,则会出现锁失效。

要解决以上问题也非常简单,只需要利用 Redis 的 hash 结构记录线程标识和重入次数就可以解决不可重入的问题。利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制即可解决不可重试的问题。而超时释放的问题则可以通过获取锁时为锁添加一个定时任务(俗称看门狗),定期刷新锁的超时时间即可。至于主从一致性问题,我们只需要利用多个独立的 Redis 节点(非主从),必须在所有节点都获取重入锁,才算获取锁成功。

image

有的人可能说了,虽然说起来简单,但真正实现起来也不是很容易呀。对于这种问题,大家不用担心,俗话说得好想要看的更远,需要站在巨人的肩膀上。对于上述的需求,早就有了成熟的开源方案 Redisson ,我们直接拿来用就可以了,无需重复造轮子,具体使用方法可以查看官方文档

轻量化消息队列

虽然市面上有很多优秀的消息中间件如 RocketMQ、Kafka 等,但对于应用场景较为简单,只需要简单的消息传递,比如任务调度、简单的通知系统等,不需要复杂的消息路由、事务支持的业务来说,用那些专门的消息中间件成本就显得过高。因此我们就可以使用 Redis 来做消息队列。
Redis 提供了三种不同的方式来实现消息队列:

  • list 结构:可以使用 list 来模拟消息队列,可以使用 BRPOP 或 BLPOP 命令来实现类似 JVM 阻塞队列的消息队列。
  • PubSub:基于发布/订阅的消息模型,但不支持数据持久化,且消息堆积有上限,超出时数据丢失。
  • Stream:Redis 5.0 新增的数据类型,可以实现一个功能非常完善的消息队列,也是我们实现消息队列的首选。

image

下面我就采用 Redis 的 Stream 实现一个简单的案例来让大家感受一下,实际工作中大家可以根据需要进行调整:

public class RedisQueueUtil{
	//Spring Data Redis 提供的 Redis 操作模板
	private StringRedisTemplate stringRedisTemplate;
	/**
	 * 获取消息队列中的数据,执行该方法前,一定要确保消费者组已经创建
	 * @param queueName 队列名
	 * @param groupName 消费者组名
	 * @param consumerName 消费者名
	 * @param type 返回值类型
	 * @return 消息队列中的数据
	 */
	public <T> T getQueueData(String queueName, String groupName, String consumerName, Class<T> type){
		while (true){
			try {
				//获取消息队列中的信息
				List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
						Consumer.from(groupName,consumerName),
						StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
						StreamOffset.create(queueName, ReadOffset.lastConsumed())
				);
				//判断消息是否获取成功
				if (list == null || list.isEmpty()){
					//如果获取失败,说明没有消息,继续下一次循环
					continue;
				}
				//如果获取成功,则解析消息中的数据
				MapRecord<String,Object,Object> record = list.get(0);
				Map<Object,Object> values = record.getValue();
				String jsonString = JSON.toJSONString(values);
				T result = JSON.parseObject(jsonString, type);
				// ACK
				stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
				//返回结果
				return result;
			}catch (Exception e){
				while (true){
					try {
						//获取 pending-list 队列中的信息
						List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
								Consumer.from(groupName,consumerName),
								StreamReadOptions.empty().count(1)),
								StreamOffset.create(queueName,ReadOffset.from("0")
						);
						//判断消息是否获取成功
						if (list == null || list.isEmpty()){
							//如果获取失败,说明 pending-list 没有异常消息,结束循环
							break;
						}
						//如果获取成功,则解析消息中的数据
						MapRecord<String,Object,Object> record = list.get(0);
						Map<Object,Object> values = record.getValue();
						String jsonString = JSON.toJSONString(values);
						T result = JSON.parseObject(jsonString, type);
						// ACK
						stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
						//返回结果
						return result;
					}catch (Exception ex){
						log.error("处理 pending-list 订单异常",ex);
						try {
							Thread.sleep(50);
						}catch (InterruptedException err){
							err.printStackTrace();
						}
					}
				}
			}
		}
	}
	/**
	 * 向消息队列中发送数据
	 * @param queueName 消息队列名
	 * @param map 要发送数据的集合
	 */
	public void sendQueueData(String queueName, Map<String,Object> map){
		StringBuilder builder = new StringBuilder("redis.call('xadd','");
		builder.append(queueName).append("','*','");
		Set<String> keys = map.keySet();
		for(String key:keys){
			builder.append(key).append("','").append(map.get(key)).append("','");
		}
		String script = builder.substring(0, builder.length() - 2);
		script += ")";
		stringRedisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Collections.emptyList());
	}
}

与Redis 高阶应用相似的内容:

Redis 高阶应用

生成全局唯一 ID 全局唯一 ID 需要满足以下要求: 唯一性:在分布式环境中,要全局唯一 高可用:在高并发情况下保证可用性 高性能:在高并发情况下生成 ID 的速度必须要快,不能花费太长时间 递增性:要确保整体递增的,以便于数据库创建索引 安全性:ID 的规律性不能太明显,以免信息泄露 从上面的要

[转帖]一致性哈希 和 Redis 集群分槽

前言 伴随着系统流量的增大,出现了应用集群。在 Redis 中为了保证 Redis 的高可用也为 Redis 搭建了集群对数据进行分槽存放。在 Mysql数据库要存储的量达到一个很高的地步的时候,我们会对数据库进行分库分表操作。OK,到这儿先假设我们不知道什么是集群、什么是分库分表,我们先来看一个数

Redis最常见的5种应用场景

Redis作为当今最流行的内存数据库,已经成为服务端加速的必备工具之一。对于Redis为什么那么快?以及Redis采用单线程,但为什么反而获得更高的性能的疑问,在之前的Redis为什么那么快?一文中,已经有所介绍。 今天通过这篇,我们来了解一下Redis最常见的5种应用场景。您可以通过视频来学习,如

[转帖]高性能 -Nginx 多进程高并发、低时延、高可靠机制在百万级缓存 (redis、memcache) 代理中间件中的应用

https://xie.infoq.cn/article/2ee961483c66a146709e7e861 关于作者 前滴滴出行技术专家,现任 OPPO 文档数据库 mongodb 负责人,负责 oppo 千万级峰值 TPS/十万亿级数据量文档数据库 mongodb 内核研发及运维工作,一直专注于

[转帖]细说Redis监控和告警

https://blog.csdn.net/sD7O95O/article/details/78096956 对于任何应用服务和组件,都需要一套完善可靠谱监控方案。尤其redis这类敏感的纯内存、高并发和低延时的服务,一套完善的监控告警方案,是精细化运营的前提。本文分几节,细说Redis的监控和告警

[转帖]细说Redis监控和告警

文章系转载,便于整理和归类,原文地址:https://blog.csdn.net/sD7O95O/article/details/78096956 对于任何应用服务和组件,都需要一套完善可靠谱监控方案。 尤其redis这类敏感的纯内存、高并发和低延时的服务,一套完善的监控告警方案,是精细化运营的前提

[转帖]MySQL十六:36张图理解Buffer Pool

https://www.cnblogs.com/yunlongn/p/16630257.html 转载~ 在应用系统中,我们为加速数据访问,会把高频的数据放在「缓存」(Redis、MongoDB)里,减轻数据库的压力。 在操作系统中,为了减少磁盘IO,引入了「缓冲池」(buffer pool)机制。

聊聊游戏业务怎么用高斯Redis

摘要:其实游戏客户对数据库的诉求是很明确的,数据库应当“放心存放心用”。 本文分享自华为云社区《华为云GaussDB(for Redis)揭秘第27期:聊聊游戏业务怎么用高斯Redis》,作者:高斯Redis官方博客。 华为云数据库团队是比较重视技术洞察的,对客户真实的业务场景也比较看重。年初出差了

记一次线上Redis内存占用过高、大Key问题的排查

问题背景 在一个风和日丽的下午,公司某项目现场运维同学反馈,生产环境3个Redis的Sentinel集群节点内存占用都很高,达到了17GB的内存占用量。 稍加思索,应该是某些Key的Value数据体量过大,占用了过多的内存空间,我们在使用Redis的过程中,单个Value或者单个集合中的元素应该保证

Redis 的安装与配置详解【Redis系列一】

〇、前言 关于 Redis 在日常开发中还是用的比较多的,特别是在秒杀、消息队列、排行榜等数据交互时效要求较高的场景,Redis 都可以轻松应对。 本文将针对 Redis 进行简单介绍,以及如何安装,并罗列下全部配置项。后续还将另行发文汇总 Redis 的常用数据结构和常见问题等。 一、什么是 Re