前言
在分布式环境中,遇到抢购等访问共享资源的场景时,需要我们有一种锁机制去解决并发问题,这里,介绍一种由Redis实现的分布式锁。
基本原理
分布式锁就是多个分布式服务去同一个地方“占坑”,谁抢到这个坑谁就继续执行业务,否则就等待,知道这个坑被释放,“占坑”这个操作可以去redis、数据库等所有服务都能访问到的地方,等待通过自旋的方式实现。
下面,我们先通过redis命令行窗口模拟下实现“占坑”。复制多个窗口,发送命令到所有的窗口,set lock hahaha NX,然后看下执行结果
可以看到只有一个服务抢到了锁。那么分布式锁就这么简单吗?当然不是,接下来,我们从上面的步骤逐步分析是否存在问题,看下分布式锁的进化过程。
分布式锁的进化
阶段一
上面,我们实现了分布式锁的第一个阶段,先看下它的流程
public String getRedisLock(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
if(lock){
//执行业务
System.out.println(Thread.currentThread().getName()+"获取到锁");
//删除锁
redisTemplate.delete("lock");
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
return getRedisLock();
}
}
这个阶段看上去好像没什么问题,但是,我们思考一个问题,setnx已经占锁成功了,这时候业务代码出现异常或者机器宕机了,然而还没有来得及执行删除锁的逻辑,这不就死锁了吗,不过这个问题给锁加个过期时间就解决了,下面看阶段二。
阶段二
加锁之后设置一个锁的过期时间,流程及代码如下
public String getRedisLock(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
if(lock){
//执行业务
System.out.println(Thread.currentThread().getName()+"获取到锁");
//设置锁的过期时间
redisTemplate.expire("lock",30, TimeUnit.MILLISECONDS);
//删除锁
redisTemplate.delete("lock");
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
return getRedisLock();
}
}
我们再仔细思考就会发现,如果没来得及执行给锁加过期时间,不还是有问题吗?因此,我们的加锁和设置过期时间应该是一个原子操作,好在redis提供了相应的命令set lock hahaha EX 300 NX,此时,应该修改为阶段三。
阶段三
加锁和设置过期时间作为一个原子操作实现,流程和代码如下
public String getRedisLock(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",5,TimeUnit.MILLISECONDS);
if(lock){
//执行业务
System.out.println(Thread.currentThread().getName()+"获取到锁");
//删除锁
redisTemplate.delete("lock");
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
return getRedisLock();
}
}
到现在,加锁的问题基本上解决了,那么,还有其他问题吗?肯定有!我们看上面的流程,假如业务执行时间超过了锁的过期时间,此时,其他线程又能获取锁了,而恰巧在其他线程获取到锁之后,业务处理结束了,那么就该删锁了,然而此时删除的并不是第一次加的锁,是别的线程加的锁,这不就把别人的锁删掉了吗?这个问题该怎么解决呢?
阶段四
给锁加上一个uuid,在占锁的时候就加上uuid,删除锁的时候判断是否是自己的锁,流程如下
public String getRedisLock(){
String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
//为这个锁设置唯一的uuid
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,5,TimeUnit.MILLISECONDS);
if(lock){
//执行业务
System.out.println(Thread.currentThread().getName()+"获取到锁");
String lockValue = (String) redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){
//删除锁
redisTemplate.delete("lock");
}
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
return getRedisLock();
}
}
那么这样还会有问题吗?答案还是有,我们虽然给删锁加了判断,但是这个判断和删锁操作之间也不是一个原子操作,你可能回想给判断之后加个等待时间,然后再删除锁,这样虽然可以避免一些问题,但是如果真的遇到极端情况呢?刚好判断结束删锁的时候别人加了锁,这样还是有问题,因此,应该和加锁一样,删锁也要保持一个原子操作。
阶段五
通过lua脚本删锁,保证删锁操作也是一个原子操作。
public String getRedisLock(){
String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
//为这个锁设置唯一的uuid
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,5,TimeUnit.MILLISECONDS);
if(lock){
//执行业务
System.out.println(Thread.currentThread().getName()+"获取到锁");
String lockValue = (String) redisTemplate.opsForValue().get("lock");
//Lua脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
//删除锁
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
return getRedisLock();
}
}
至此,才算是实现了redis分布式锁的一个完整形态。
测试
我们对上面的代码稍作修改,做一个测试,代码如下所示
package com.example.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author zy
* @version 1.0.0
* @ClassName test.java
* @Description TODO
* @createTime 2022/12/21
*/
@RestController
public class test {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("/test")
public void test(){
for (int i = 0; i < 10; i++) {
new Thread(()->{
getRedisLock();
}).start();
}
}
public String getRedisLock(){
// System.out.println(Thread.currentThread().getName());
String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
//为这个锁设置唯一的uuid
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,20,TimeUnit.MILLISECONDS);
if(lock){
//执行业务
try{
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"获取到锁");
}
catch (Exception e){
e.printStackTrace();
}
String lockValue = (String) redisTemplate.opsForValue().get("lock");
//Lua脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
//删除锁
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
return "success";
}else{
//抢占锁失败,重试
try{
Thread.sleep(100);
System.out.println(Thread.currentThread().getName()+"获取锁失败");
}catch (Exception e){
e.printStackTrace();
}
return "false";
// return getRedisLock(); //使用时记得放开这里
}
}
}
可以看到,10个线程,只有一个获取到了锁,其他的全部失败。
总结
上面,我们通过redis命令行模拟,逐步完善了redis分布式锁的流程,然后做了一个测试demo,由此可见,最终形式的分布式锁没有问题,至此,redis分布式锁的实现大功告成!!!
使用建议
可结合 AOP 切面注解使用。
转载至 https://blog.csdn.net/qq_36933421/article/details/128395944
尊重原创!!!