import com.alibaba.fastjson.JSON;
import lombok.extern.log4j.Log4j2;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Log4j2
@Service
public class RedisLockUtils {
class CountDTO{
private String clientId;
private AtomicInteger count;
private String treadName;
public CountDTO() {
}
public CountDTO(String clientId, AtomicInteger count, String treadName) {
this.clientId = clientId;
this.count = count;
this.treadName = treadName;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
public String getTreadName() {
return treadName;
}
public void setTreadName(String treadName) {
this.treadName = treadName;
}
}
@Resource
private RedisTemplate redisTemplate;
private static final ThreadLocal<CountDTO> threadLocal = new ThreadLocal<>();
//定期执行任务
private ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
public boolean tryLock(String key, String value, Long time, Long timeout) {
CountDTO countDTO = threadLocal.get();
log.info("threadLocal : " + JSON.toJSONString(threadLocal.get()) +" : " + JSON.toJSONString(threadLocal));
if (countDTO == null) {
String clientId = UUID.randomUUID().toString();
countDTO = new CountDTO();
countDTO.setClientId(clientId);
countDTO.setCount(new AtomicInteger(1));
countDTO.setTreadName(Thread.currentThread().getName());
threadLocal.set(countDTO);
log.info("第一次上锁时 :" + JSON.toJSONString(threadLocal.get()));
} else {
countDTO.setCount(new AtomicInteger(countDTO.getCount().incrementAndGet()));
threadLocal.set(countDTO);
log.info("重入锁时 :" + JSON.toJSONString(countDTO));
return true;
}
int i = 0;
while (true) {
if (redisTemplate.opsForValue().setIfAbsent(key, countDTO.getClientId(), time,TimeUnit.SECONDS)) {
log.info("get lock success ,key=" + key + ", expire seconds=" + time + " , value : " + countDTO.getClientId());
this.watchDog(key, countDTO.getClientId(), time);
return true;
}
try {
i++;
TimeUnit.MILLISECONDS.sleep(200); //500毫秒 自旋一次
log.info(Thread.currentThread().getName()+" : 进行"+ i +"次自旋");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public void releaseLock(String key) {
try {
//锁不为空时
if (threadLocal.get() != null && threadLocal.get().getClientId().equals(redisTemplate.opsForValue().get(key))) {
log.info(" 解锁时 : " + JSON.toJSONString(threadLocal.get()));
if (threadLocal.get().getCount().get() == 0) {
log.info(" 解锁没有重入数量时 : " + JSON.toJSONString(threadLocal.get()));
//shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
scheduledExecutorService.shutdown();
redisTemplate.execute(new DefaultRedisScript(unlockLua, Long.class), Arrays.asList(key), Arrays.asList(threadLocal.get().getClientId()));
} else {
CountDTO countDTO = threadLocal.get();
countDTO.setCount(new AtomicInteger(countDTO.getCount().decrementAndGet())); //解锁一次 -1
log.info(" 解锁但是还有重入数量时 : " + JSON.toJSONString(countDTO));
threadLocal.set(countDTO);
}
}
}catch (Exception e){
e.printStackTrace();
} finally {
if (threadLocal.get() != null) {
log.info(" 当解锁时不为空 : " + JSON.toJSONString(threadLocal.get()));
if (threadLocal.get().getCount().get() == 0) {
//shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
scheduledExecutorService.shutdown();
Object execute = redisTemplate.execute(new DefaultRedisScript(unlockLua, Long.class), Arrays.asList(key), threadLocal.get().getClientId());
log.info(" 解锁成功时 : " + JSON.toJSONString(execute));
threadLocal.remove();
}
}
}
}
/**
* 看门狗执行逻辑
*/
private void watchDog(String key, String value, long ttl) {
//获取续命速率
long rate = getRate(ttl);
if (scheduledExecutorService.isShutdown()) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
}
//周期性执行,根据rate进行执行
scheduledExecutorService.scheduleAtFixedRate(new watchDogThread(scheduledExecutorService, Arrays.asList(key), value, ttl),
1, rate, TimeUnit.SECONDS); // 无返回值的 固定频率周期任务
}
private long getRate(long ttl) { //大于5时,则最后5秒执行,大于1 小于5时,则最后一秒执行
if (ttl - 5 > 0) {
return ttl - 5;
} else if (ttl - 1 > 0) {
return ttl - 1;
}
throw new RuntimeException("ttl 不允许小于1");
}
class watchDogThread implements Runnable {
private ScheduledThreadPoolExecutor poolExecutor;
private List<String> keys;
private String value;
private Long ttl;
public watchDogThread(ScheduledThreadPoolExecutor poolExecutor, List<String> keys,String value,Long ttl) {
this.poolExecutor = poolExecutor;
this.keys = keys;
this.value = value;
this.ttl = ttl;
}
@Override
public void run() {
log.info("进行续期");
try {
long execute = (long) redisTemplate.execute(new DefaultRedisScript(watchLua, Long.class), keys, value,ttl);
log.info(" execute : " + execute);
if (execute == 0) {
//续期失败 可能是业务系统发生异常并且没有进行异常捕捉,没有进行释放锁操作
poolExecutor.shutdown();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
private final String watchLua = "local lock_key=KEYS[1]\n" +
"local lock_value=ARGV[1]\n" +
"local lock_ttl=tonumber(ARGV[2])\n" +
"local current_value=redis.call('get',lock_key)\n" +
"local result=0;\n" +
"if lock_value==current_value then\n" +
" result=1;\n" +
" redis.call('expire',lock_key,lock_ttl)\n" +
"end\n" +
"return result";
private final String unlockLua = "local lock_key=KEYS[1]\n" +
"local lock_value=ARGV[1]\n" +
"\n" +
"local current_value=redis.call('get',lock_key)\n" +
"local result=0\n" +
"if lock_value==current_value then\n" +
" redis.call('del',lock_key)\n" +
" result=1\n" +
"end\n" +
" return result";
}