前言

在分布式环境中,遇到抢购等访问共享资源的场景时,需要我们有一种锁机制去解决并发问题,这里,介绍一种由Redis实现的分布式锁。

基本原理

分布式锁就是多个分布式服务去同一个地方“占坑”,谁抢到这个坑谁就继续执行业务,否则就等待,知道这个坑被释放,“占坑”这个操作可以去redis、数据库等所有服务都能访问到的地方,等待通过自旋的方式实现。
image-1684921592127

下面,我们先通过redis命令行窗口模拟下实现“占坑”。复制多个窗口,发送命令到所有的窗口,set lock hahaha NX,然后看下执行结果

image-1684921606862

可以看到只有一个服务抢到了锁。那么分布式锁就这么简单吗?当然不是,接下来,我们从上面的步骤逐步分析是否存在问题,看下分布式锁的进化过程。

分布式锁的进化

阶段一

上面,我们实现了分布式锁的第一个阶段,先看下它的流程
image-1684921688648

public String getRedisLock(){
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
        if(lock){
            //执行业务
            System.out.println(Thread.currentThread().getName()+"获取到锁");
            //删除锁
            redisTemplate.delete("lock");
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            return getRedisLock();
        }
}

这个阶段看上去好像没什么问题,但是,我们思考一个问题,setnx已经占锁成功了,这时候业务代码出现异常或者机器宕机了,然而还没有来得及执行删除锁的逻辑,这不就死锁了吗,不过这个问题给锁加个过期时间就解决了,下面看阶段二。

阶段二

加锁之后设置一个锁的过期时间,流程及代码如下
image-1684921765430

public String getRedisLock(){
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
        if(lock){
            //执行业务
            System.out.println(Thread.currentThread().getName()+"获取到锁");
            //设置锁的过期时间
            redisTemplate.expire("lock",30, TimeUnit.MILLISECONDS);
            //删除锁
            redisTemplate.delete("lock");
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            return getRedisLock();
        }
    }

我们再仔细思考就会发现,如果没来得及执行给锁加过期时间,不还是有问题吗?因此,我们的加锁和设置过期时间应该是一个原子操作,好在redis提供了相应的命令set lock hahaha EX 300 NX,此时,应该修改为阶段三。

阶段三

加锁和设置过期时间作为一个原子操作实现,流程和代码如下
image-1684921824720

public String getRedisLock(){
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",5,TimeUnit.MILLISECONDS);
        if(lock){
            //执行业务
            System.out.println(Thread.currentThread().getName()+"获取到锁");
            //删除锁
            redisTemplate.delete("lock");
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            return getRedisLock();
        }
    }

到现在,加锁的问题基本上解决了,那么,还有其他问题吗?肯定有!我们看上面的流程,假如业务执行时间超过了锁的过期时间,此时,其他线程又能获取锁了,而恰巧在其他线程获取到锁之后,业务处理结束了,那么就该删锁了,然而此时删除的并不是第一次加的锁,是别的线程加的锁,这不就把别人的锁删掉了吗?这个问题该怎么解决呢?

阶段四

给锁加上一个uuid,在占锁的时候就加上uuid,删除锁的时候判断是否是自己的锁,流程如下
image-1684921874336

public String getRedisLock(){
        String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
        //为这个锁设置唯一的uuid
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,5,TimeUnit.MILLISECONDS);
        if(lock){
            //执行业务
            System.out.println(Thread.currentThread().getName()+"获取到锁");
            String lockValue = (String) redisTemplate.opsForValue().get("lock");
            if(lockValue.equals(uuid)){
                //删除锁
                redisTemplate.delete("lock");
            }
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            return getRedisLock();
        }
    }

那么这样还会有问题吗?答案还是有,我们虽然给删锁加了判断,但是这个判断和删锁操作之间也不是一个原子操作,你可能回想给判断之后加个等待时间,然后再删除锁,这样虽然可以避免一些问题,但是如果真的遇到极端情况呢?刚好判断结束删锁的时候别人加了锁,这样还是有问题,因此,应该和加锁一样,删锁也要保持一个原子操作。

阶段五

通过lua脚本删锁,保证删锁操作也是一个原子操作。
image-1684921926924

public String getRedisLock(){
        String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
        //为这个锁设置唯一的uuid
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,5,TimeUnit.MILLISECONDS);
        if(lock){
            //执行业务
            System.out.println(Thread.currentThread().getName()+"获取到锁");
            String lockValue = (String) redisTemplate.opsForValue().get("lock");
            //Lua脚本解锁
            String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
            //删除锁
            redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            return getRedisLock();
        }
    }

至此,才算是实现了redis分布式锁的一个完整形态。

测试

我们对上面的代码稍作修改,做一个测试,代码如下所示

package com.example.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author zy
 * @version 1.0.0
 * @ClassName test.java
 * @Description TODO
 * @createTime 2022/12/21
 */
@RestController
public class test {
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/test")
    public void test(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                getRedisLock();
            }).start();
        }
    }

    public String getRedisLock(){
//        System.out.println(Thread.currentThread().getName());
        String uuid = UUID.randomUUID().toString().replace("-","")+System.currentTimeMillis();
        //为这个锁设置唯一的uuid
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,20,TimeUnit.MILLISECONDS);
        if(lock){
            //执行业务
            try{
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"获取到锁");
            }
            catch (Exception e){
                e.printStackTrace();
            }
            String lockValue = (String) redisTemplate.opsForValue().get("lock");
            //Lua脚本解锁
            String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
            //删除锁
            redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), lockValue);
            return "success";
        }else{
            //抢占锁失败,重试
            try{
                Thread.sleep(100);
                System.out.println(Thread.currentThread().getName()+"获取锁失败");
            }catch (Exception e){
                e.printStackTrace();
            }
            return "false";
//            return getRedisLock(); //使用时记得放开这里
        }
    }
}

image-1684922009037
可以看到,10个线程,只有一个获取到了锁,其他的全部失败。

总结

上面,我们通过redis命令行模拟,逐步完善了redis分布式锁的流程,然后做了一个测试demo,由此可见,最终形式的分布式锁没有问题,至此,redis分布式锁的实现大功告成!!!

使用建议

可结合 AOP 切面注解使用。

转载至 https://blog.csdn.net/qq_36933421/article/details/128395944
尊重原创!!!