如何实现一个分布式锁

· 浏览次数 : 17

小编点评

本文介绍了如何使用Java语言实现基于注解的分布式锁。主要通过AOP环绕通知来实现加锁和解锁操作,包括锁的注解、AOP切面的编写、锁续期任务的处理以及锁可重入性的实现。 1. **锁注解**:本文首先定义了一个分布式锁的注解`@RedisLock`,用于标注需要在方法上使用分布式锁。注解中包含了锁的超时时间、过期时间等参数。 2. **AOP切面**:接着,文章描述了如何编写一个AOP切面来处理带有`@RedisLock`注解的方法。切面中定义了一个切点表达式来匹配所有注解方法,并使用`@Around`注解创建了一个环绕通知来执行加锁和解锁的操作。 3. **锁续期任务**:为了防止锁过期,文章提出了一个自动续约锁的方案。通过定义一个锁续约任务类`LockRenewTask`,并在定时任务处理器`LockRenewHandler`中执行续约任务来延长锁的有效期。 4. **锁可重入性**:最后,文章提到了如何通过添加续约任务来确保分布式锁的可重入性。在AOP切面的`invoke`方法中,在执行业务代码之前添加锁续约任务,并在业务代码执行完毕后,通过`LockRenewHandler`来取消该锁,从而保证锁的正确释放。 综上所述,本文通过结合注解和AOP技术,实现了一个具有自动续约功能的分布式锁,同时保证了锁的可重入性,为高并发场景下的分布式锁提供了有效的解决方案。

正文

如何实现一个分布式锁

本篇内容主要介绍如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解+AOP 环绕通知来实现。

1. 锁注解

我们首先写一个锁的注解

/**
 * 分布式锁注解
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RedisLock {

	long DEFAULT_TIMEOUT_FOR_LOCK = 5L;
	long DEFAULT_EXPIRE_TIME = 60L;

	String key() default "your-biz-key";

	long expiredTime() default DEFAULT_EXPIRE_TIME;

	long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;

}

expiredTime 是设置锁的过期时间,timeoutForLock 是设置等待锁的超时时间。如果没有等待获得锁的超时时间这个功能,那么其他线程在获取锁失败时只能直接失败,无法进行排队等待。

我们如何使用这个注解呢,很容易,在需要加锁的业务方法上直接用就行.如下,我们有一个库存服务类,它有一个扣减库存方法,该方法将数据库中的一个库存商品的数量减一。在并发场景下,如果我们没有对其进行资源控制,必然会发生库存扣减不一致现象。

public class StockServiceImpl {
	@RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)
	public void deduct(Long stockId) {
		Stock stock = this.getById(1L);
		Integer count = stock.getCount();
		stock.setCount(count - 1);
		this.updateById(stock);
	}
}

2. 在 AOP 切面中进行加锁处理

我们需要使用 AOP 来处理什么?自然是处理使用@RedisLock的方法,因此我们写一个切点表达式,它匹配所有标有 @RedisLock 注解的方法。
接着,我们将此切点表达式与 @Around 注解结合使用,以创建环绕通知,在目标方法执行前后执行我们的加锁解锁逻辑。

因此,基本的逻辑我们就理清了,代码大致长下面这个样子:

public class RedisLockAspect {

	private final RedisTemplate<String, Object> redisTemplate;

	// 锁的redis key前缀
	private static final String DEFAULT_KEY_PREFIX = "lock:";

	// 匹配所有标有 @RedisLock 注解的方法
	@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
	public void lockAnno() {
	}


	@Around("lockAnno()")
	public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
		// 获取拦截方法上的RedisLock注解
		RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
		// 获取锁key
		String key = getKey(annotation);
		// 锁过期时间
		long expireTime = annotation.expiredTime();
		// 获取锁的等待时间
		long timeoutForLock = annotation.timeoutForLock();
		// 在这里加锁
		someCodeForLock...
		// 执行业务
		joinPoint.proceed();
		// 在这里解锁
		someCodeForUnLock...
	}

我们在加锁的时候,需要用上 timeoutForLock 这个属性,我们通过自旋加线程休眠的方式,来达到在一段时间内等待获取锁的目的。如果自旋时间结束后,还没获取锁,则抛出异常,这里可以根据自己情况而定。自旋加锁代码如下:

    // 自旋获取锁
    long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
    boolean acquired = false;
    String uuid = UUID.randomUUID().toString();
    while(System.currentTimeMillis() < endTime) {
        Boolean absent = redisTemplate.opsForValue()
                .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);

        if (Boolean.TRUE.equals(absent)) {
            acquired = true;
            break;
        } else {
            // 获取不到锁,尝试休眠100毫秒后重试
            Thread.sleep(100);
        }
    }
    // 超时未获取到锁, 抛出异常,可根据自己业务而定
    if (!acquired) {
        throw new RuntimeException("获取锁异常");
    }

我们发现上面加锁的时候设置了一个 uuid 作为 value 值,这是为了在锁释放的时候,不误删其他线程上的锁,随后,我们就可以执行被 AOP 切中的方法,执行结束释放锁。代码如下:

    try {
        // 执行业务
        joinPoint.proceed();
    } catch (Throwable e) {
        log.error("业务执行出错!");
    } finally {
        // 解锁时进行校验,只删除自己线程加的锁
        String value = (String) redisTemplate.opsForValue().get(key);
        if (uuid.equals(value)) {
            redisTemplate.delete(key);
        } else {
            log.warn("锁已过期!");
        }
    }

到这里,我们就以注解+AOP 的方式实现了分布式锁的功能。当然,以上只实现了分布式锁的简单功能,还缺少了分布式锁的 key 自动续约防止锁过期功能,以及锁重入功能。

目前,RedisLockAspect的完整代码如下:

@Component
@Aspect
@Slf4j
@AllArgsConstructor
public class RedisLockAspect {

	// 匹配所有标有 @RedisLock 注解的方法
	@Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
	public void lockAnno() {
	}


	@Around("lockAnno()")
	public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
		// 获取拦截方法上的RedisLock注解
		RedisLock annotation = getLockAnnotationOnMethod(joinPoint);

		String key = getKey(annotation);
		// 锁过期时间
		long expireTime = annotation.expiredTime();
		// 获取锁的等待时间
		long timeoutForLock = annotation.timeoutForLock();
		// 自旋获取锁
		long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
		boolean acquired = false;
		String uuid = UUID.randomUUID().toString();
		while(System.currentTimeMillis() < endTime) {
			Boolean absent = redisTemplate.opsForValue()
					.setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);

			if (Boolean.TRUE.equals(absent)) {
				acquired = true;
				break;
			} else {
				// 获取不到锁,尝试休眠100毫秒后重试
				Thread.sleep(100);
			}
		}
		// 超时未获取到锁, 抛出异常,可根据自己业务而定
		if (!acquired) {
			throw new RuntimeException("获取锁异常");
		}
		try {
			// 执行业务
			joinPoint.proceed();
		} catch (Throwable e) {
			log.error("业务执行出错!");
		} finally {
			// 解锁时进行校验,只删除自己线程加的锁
			String value = (String) redisTemplate.opsForValue().get(key);
			if (uuid.equals(value)) {
				redisTemplate.delete(key);
			} else {
				log.warn("锁已过期!");
			}
		}
	}

	private String getKey(RedisLock redisLock) {
		if (Objects.isNull(redisLock)) {
			return DEFAULT_KEY_PREFIX + "default";
		}
		return DEFAULT_KEY_PREFIX + redisLock.key();
	}

	private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {
		MethodSignature signature = (MethodSignature) joinPoint.getSignature();
		Method method = signature.getMethod();
		return method.getAnnotation(RedisLock.class);
	}

}

3. key 自动续约防止锁过期

我们接着完善该分布式锁,为其添加 key 自动续约防止锁过期的功能。我们的思路与Redission的watch dog类似,开启一个后台线程,来定时检查需要续约的锁。我们如何判断一个锁是否需要续约呢,我们可以简单定义一个续约分界线,比如在锁过期时间的三分之二的时间点及之后,对锁进行续约。

3.1 定义一个续约任务4

我们来定义一个锁续约任务,那我们需要什么信息呢?
我们至少需要锁的 key,锁要设置的过期时间。这是两个最基本的信息。
要判断在锁过期时间的三分之二的时间点及之后进行续约,那么我们还需要记录锁上次续约的时间点。
此外,我们还可以为锁续约任务添加最大续约次数限制,这可以避免某些执行时间特别久的任务不断占用锁。所以我们还需要记录当前锁续约次数和最大续约次数。
对超过最大续约次数的锁的线程,我们直接将其停止,因此我们也记录一下该锁的线程。
结合上面的分析,我们定义的锁续约任务类如下:

public class LockRenewTask {

	/**
	 * key
	 */
	private final String key;
	/**
	 * 过期时间。单位:秒
	 */
	private final long expiredTime;
	/**
	 * 锁的最大续约次数
	 */
	private final int maxRenewCount;
	/**
	 * 锁的当前续约次数
	 */
	private int currentRenewCount;
	/**
	 * 最新更新时间
	 */
	private LocalDateTime latestRenewTime;
	/**
	 * 业务线程
	 */
	private final Thread thread;

	public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {
		this.key = key;
		this.expiredTime = expiredTime;
		this.maxRenewCount = maxRenewCount;
		this.thread = thread;
		this.latestRenewTime = LocalDateTime.now();
	}
	/**
	 * 是否到达续约时间
	 * @return
	 */
	public boolean isTimeToRenew() {
		LocalDateTime now = LocalDateTime.now();
		Duration duration = Duration.between(latestRenewTime, now);

		return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);
	}
	/**
	 * 是否达到最大续约次数
	 * @return
	 */
	public boolean exceedMaxRenewCount() {
		return this.currentRenewCount >= this.maxRenewCount;
	}
	public synchronized void renew() {
		this.currentRenewCount++;
		this.latestRenewTime = LocalDateTime.now();
	}
	// 取消业务方法
	public void cancel() {
		thread.interrupt();
	}
	public String getKey() {
		return key;
	}
	public long getExpiredTime() {
		return expiredTime;
	}
}

我们添��了一些关于锁续约的方法:

  • isTimeToRenew(): 判断是否可以对锁进行续约
  • exceedMaxRenewCount(): 判断是否达到最大续约次数
  • renew(): 来标记一次续约操作
  • cancel(): 取消业务方法

3.2 定义一个锁续约任务处理器

接着,我们定义一个定时执行该续约任务的 handler。该 handler 也比较简答,核心逻辑是持有一个类型为 List<LockRenewTask>taskList 来添加续约任务,且使用一个 ScheduledExecutorService 来定时遍历该 taskList 来执行续约任务。该 handler 再对外暴露一个 addRenewTask 方法,方便外部调用来添加续约任务到 taskList 中。

@Slf4j
@Component
public class LockRenewHandler {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 保障对 taskList的添加删除操作是线程安全的
	 */
	private final ReentrantLock taskListLock = new ReentrantLock();

	private final List<LockRenewTask> taskList = new ArrayList<>();

	private final ScheduledExecutorService taskExecutorService;

	{
		taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
		taskExecutorService.scheduleAtFixedRate(() -> {
			try {
				executeRenewTask();
			} catch (Exception e) {
				//错误处理
			}
		}, 1, 2, TimeUnit.SECONDS);

	}
	/**
	 * 添加续约任务
	 */
	public void addRenewTask(LockRenewTask task) {
		taskListLock.lock();
		try {
			taskList.add(task);
		} finally {
			taskListLock.unlock();
		}
	}
	/**
	 * 执行续约任务
	 */
	private void executeRenewTask() {
		log.info("开始执行续约任务");
		if (CollectionUtils.isEmpty(taskList)) {
			return;
		}
		// 需要删除的任务,暂存这个集合中  取消
		List<LockRenewTask> cancelTask = new ArrayList<>();
		// 获取任务副本
		List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);
		for (LockRenewTask task : copyTaskList) {
			try {
				// 判断 Redis 中是否存在 key
				if (!redisTemplate.hasKey(task.getKey())) {
					cancelTask.add(task);
					continue;
				}
				// 大于等于最大续约次数
				if (task.exceedMaxRenewCount()) {
					// 停止续约任务
					task.cancel();
					cancelTask.add(task);
					continue;
				}
				// 到达续约时间
				if (task.isTimeToRenew()) {
					log.info("续约任务:{}", task.getKey());
					redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);
					task.renew();
				}
			} catch (Exception e) {
				//错误处理
				log.error("处理任务出错:{}", task);
			}
		}
		// 加锁,删除 taskList 中需要移除的任务
		taskListLock.lock();
		try {
			taskList.removeAll(cancelTask);
			// 清理cancelTask,避免堆积,产生内存泄露
			cancelTask.clear();
		} finally {
			taskListLock.unlock();
		}
	}
}

总结一下 LockRenewHandler的主要作用:它负责管理和执行续约任务,以延长 Redis 中键的过期时间。

  • 添加续约任务:addRenewTask() 方法允许添加新的续约任务到内部列表 taskList 中。
  • 执行续约任务:executeRenewTask() 方法定期执行续约任务。它检查每个任务的状态,并根据需要续约 Redis 中的键。
  • 移除完成的任务:维护一个 cancelTask 列表,用于存储需要从 taskList 中移除的任务。在 executeRenewTask() 方法中,它会将完成的任务添加到 cancelTask 列表中,并在之后将其从 taskList 中移除。

大概的工作流程如下:

  • 续约任务被添加到 taskList 中。

  • executeRenewTask() 方法定期执行,它检查每个任务的状态:

    • 如果 Redis 中不再存在该键,则取消任务。
    • 如果任务的续约次数达到上限,则取消任务。
    • 如果是时候续约了,则续约 Redis 中的键并更新任务的续约次数,记录续约时间点。
  • 完成的任务被添加到 cancelTask 列表中。

  • executeRenewTask() 方法获取 taskList 的副本,并从副本中移除 cancelTask 中的任务,并且在完成移除任务操作后清空cancelTask

  • 更新后的 taskList 被保存回类中。

两个需要注意的点

  • 我们遍历taskList时拷贝了一份副本进行遍历,因为taskList是可变的,这样可以避免在遍历的时候产生并发修改问题。
  • cancelTask需要清理,避免产生内存泄漏。

通过这种方式,LockRenewHandler 可以确保 Redis 中的键在需要时得到续约,并自动移除完成或失败的任务。

3.3 添加锁续约任务

在上面 3.1 节和 3.2 节我们定义好了锁续约任务和处理锁续约任务的核心代码,接下来我们需要在第 2 节加锁解锁的 AOP 处理逻辑上进行一点小小的修改,主要就是在执行加锁之后,执行业务代码之前,添加上锁续约任务。修改位置如下:

public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
    ... // 省略代码
    try {
        // 添加锁续约任务
        LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());
        lockRenewHandler.addRenewTask(task);
        log.info("添加续约任务, key:{}", key);
        // 执行业务
        joinPoint.proceed();
    } catch (Throwable e) {
        log.error("业务执行出错!");
    } finally {
        // 解锁时进行校验,只删除自己线程加的锁
        String value = (String) redisTemplate.opsForValue().get(key);
        if (uuid.equals(value)) {
            redisTemplate.delete(key);
        } else {
            log.warn("锁已过期!");
        }
    }
    ... // 省略代码
}

到这里,我们的分布式锁已经相当完善了,把锁自动续约的功能也加上了。当然,还没有实现锁的可重入性。

与如何实现一个分布式锁相似的内容:

如何实现一个分布式锁

如何实现一个分布式锁 本篇内容主要介绍如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解+AOP 环绕通知来实现。 1. 锁注解 我们首先写一个锁的注解 /** * 分布式锁注解 */ @Retention(RetentionPolicy.RUNTIME) @Target({Eleme

RabbitMQ+redis+Redisson分布式锁+seata实现订单服务

引言 订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到。 开启分布式事务可以保证跨多个服务的数据操作的一致性和完整性, 使用分布式锁可以确保在同一时间只有一个操作能够成功执行,避免并发引起的问题。 订单流程(只展示重要的内容

zookeeper的Leader选举源码解析

zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。

一文搞懂到底什么是 AQS

日常开发中,我们经常使用锁或者其他同步器来控制并发,那么它们的基础框架是什么呢?如何实现的同步功能呢?本文将详细用白话讲解构建锁和同步器的基础框架--AQS,并根据源码分析其原理。

Vitess全局唯一ID生成的实现方案

为了标识一段数据,通常我们会为其指定一个唯一id,比如利用MySQL数据库中的自增主键。 但是当数据量非常大时,仅靠数据库的自增主键是远远不够的,并且对于分布式数据库只依赖MySQL的自增id无法满足全局唯一的需求。因此,产生了多种解决方案,如UUID,SnowFlake等。下文将介绍Vitess是如何解决这个问题的。

腾讯面试:如何提升Kafka吞吐量?

Kafka 是一个分布式流处理平台和消息系统,用于构建实时数据管道和流应用。它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。 Kafka 特点是高吞吐量、分布式架构、支持持久化、集群水平扩展和消费组消息消费,具体来说: 高吞吐量:Kafka 具有高性能和低延迟的特性,

如何实现简单的分布式链路功能?

为什么需要链路跟踪 为什么需要链路跟踪?微服务环境下,服务之间相互调用,可能存在 A->B->C->D->C 这种复杂的服务交互,那么需要一种方法可以将一次请求链路完整记录下来,否则排查问题不好下手、请求日志也无法完整串起来。 如何实现链路跟踪 假设我们从用户请求接口开始,每次请求需要有唯一的请求

如何实现流量控制和熔断降级?

Sentinel Sentinel 是阿里巴巴开源的一款高可用性和流量控制的分布式系统。它最初是为了解决阿里巴巴内部的微服务架构中的流量控制和熔断降级问题而开发的。Sentinel 旨在提供实时的流量控制、熔断降级、系统负载保护等功能,以保障应用的高可用性和稳定性。以下是 Sentinel 的详细介

Scaling Memcache at Facebook

Memcached 是一种众所周知的、简单的内存缓存解决方案。本文描述了 Facebook 如何利用 memcached 作为构建块来构造和扩展一个分布式键值存储支持世界上最大的社交网络。 1.Introduction 一个社交网络(FB)的基础架构通常需要以下 允许实时通信(近似,允许一定的延迟)

.NET 中使用 OpenTelemetry Traces 追踪应用程序

上一次我们讲了 OpenTelemetry Logs。今天继续来说说 OpenTelemetry Traces。 在今天的微服务和云原生环境中,理解和监控系统的行为变得越来越重要。在当下我们实现一个功能可能需要调用了 N 个方法,涉及到 N 个服务。方法之间的调用如蜘蛛网一样。分布式追踪这个时候就至