数据库实现分布式锁方式比较多,如悲观锁(查询时增加for update)、乐观锁(通过version字段)、增加一个表记录锁信息等。因为依赖于数据库,比较好理解,但是也存在一些问题。
如悲观锁在某些情况下可能会锁表而不是锁行,乐观锁可能需要多次重试,以及操作数据库的性能开销等等,所以基于数据库的分布式锁不做过多研究,因为我看来基本上不会用到。
package com.mashibing.lock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
@PropertySource("classpath:application.properties")
public class RedisConfig {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private int port;
@Value("${redis.timeout}")
private int timeout;
@Value("${redis.maxIdle}")
private int maxIdle;
@Value("${redis.maxWaitMillis}")
private int maxWaitMillis;
@Value("${redis.blockWhenExhausted}")
private Boolean blockWhenExhausted;
@Value("${redis.JmxEnabled}")
private Boolean JmxEnabled;
@Bean
public JedisPool jedisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
// 连接耗尽时是否阻塞, false报异常,true阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);
// 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(JmxEnabled);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout);
return jedisPool;
}
}
package com.mashibing.lock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 分布式锁的实现
*/
@Component
public class RedisDistLock implements Lock {
private final static int LOCK_TIME = 5*1000;//失效时间
private final static String RS_DISTLOCK_NS = "tdln:"; //加锁的key的前缀
/*
if redis.call('get',KEYS[1])==ARGV[1] then
return redis.call('del', KEYS[1])
else return 0 end
*/
//释放锁的时候,确保原子。lua脚本:确保 释放锁的线程就是加锁的线程,不能被线程的线程无脑调用释放
private final static String RELEASE_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
" else return 0 end";
/*保存每个线程的独有的ID值*/
private ThreadLocal<String> lockerId = new ThreadLocal<>();
/*解决锁的重入*/
private Thread ownerThread;
private String lockName = "lock";
@Autowired
private JedisPool jedisPool;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
public Thread getOwnerThread() {
return ownerThread;
}
public void setOwnerThread(Thread ownerThread) {//加锁成功,就会把抢到锁的线程进行保存
this.ownerThread = ownerThread;
}
@Override
public void lock() { //redis的分布式锁
while(!tryLock()){
try {
Thread.sleep(100); //每隔100ms 都会去尝试加锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException("不支持可中断获取锁!");
}
@Override
public boolean tryLock() {
Thread t = Thread.currentThread();
if(ownerThread==t){/*说明本线程持有锁*/
return true;
}else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/
return false;
}
Jedis jedis = jedisPool.getResource();
try {
String id = UUID.randomUUID().toString();
SetParams params = new SetParams();
params.px(LOCK_TIME);
params.nx();
synchronized (this){/*线程们,本地抢锁*/
if((ownerThread==null)&&
"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){
lockerId.set(id);
setOwnerThread(t);
return true;
}else{
return false;
}
}
} catch (Exception e) {
throw new RuntimeException("分布式锁尝试加锁失败!");
} finally {
jedis.close();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("不支持等待尝试获取锁!");
}
@Override
public void unlock() {
if(ownerThread!=Thread.currentThread()) {
throw new RuntimeException("试图释放无所有权的锁!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockName),
Arrays.asList(lockerId.get()));
if(result.longValue()!=0L){
System.out.println("Redis上的锁已释放!");
}else{
System.out.println("Redis上的锁释放失败!");
}
} catch (Exception e) {
throw new RuntimeException("释放锁失败!",e);
} finally {
if(jedis!=null) jedis.close();
lockerId.remove();
setOwnerThread(null);
System.out.println("本地锁所有权已释放!");
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("不支持等待通知操作!");
}
}
这样实现的分布式锁会产生的问题是,当业务代码消耗的时间大于redis的过期时间,业务代码还没执行完,锁已经释放
解决方案:加入看门狗,就是判断业务代码是否执行完,没执行完, 在redis失效前100毫秒,给redis续期
ItemVo
package com.mashibing.lock.rdl;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
*
*类说明:存放到延迟队列的元素,比标准的delay的实现要提前一点时间
*/
public class ItemVo<T> implements Delayed{
/*到期时刻 20:00:35,234*/
private long activeTime;
/*业务数据,泛型*/
private T data;
/*传入的数值代表过期的时长,单位毫秒,需要乘1000转换为毫秒和到期时间
* 同时提前100毫秒续期,具体的时间可以自己决定*/
public ItemVo(long expirationTime, T data) {
super();
this.activeTime = expirationTime+System.currentTimeMillis()-100;
this.data = data;
}
public long getActiveTime() {
return activeTime;
}
public T getData() {
return data;
}
/**
* 返回元素到激活时刻的剩余时长
*/
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.activeTime
- System.currentTimeMillis(),unit);
return d;
}
/**按剩余时长排序*/
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.MILLISECONDS)
-o.getDelay(TimeUnit.MILLISECONDS));
if (d==0){
return 0;
}else{
if (d<0){
return -1;
}else{
return 1;
}
}
}
}
LockItem
package com.mashibing.lock.rdl;
/**
*
*类说明:Redis的key-value结构
*/
public class LockItem {
private final String key;
private final String value;
public LockItem(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
}
RedisDistLockWithDog
package com.mashibing.lock.rdl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 分布式锁,附带看门狗线程的实现:加锁,保持锁1秒
*/
@Component
public class RedisDistLockWithDog implements Lock {
private final static int LOCK_TIME = 1*1000;
private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);
private final static String RS_DISTLOCK_NS = "tdln2:";
/*
if redis.call('get',KEYS[1])==ARGV[1] then
return redis.call('del', KEYS[1])
else return 0 end
*/
private final static String RELEASE_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
" else return 0 end";
/*还有并发问题,考虑ThreadLocal*/
private ThreadLocal<String> lockerId = new ThreadLocal<>();
private Thread ownerThread;
private String lockName = "lock";
@Autowired
private JedisPool jedisPool;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
public Thread getOwnerThread() {
return ownerThread;
}
public void setOwnerThread(Thread ownerThread) {
this.ownerThread = ownerThread;
}
@Override
public void lock() {
while(!tryLock()){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException("不支持可中断获取锁!");
}
@Override
public boolean tryLock() {
Thread t=Thread.currentThread();
/*说明本线程正在持有锁*/
if(ownerThread==t) {
return true;
}else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/
return false;
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/
String id = UUID.randomUUID().toString();
SetParams params = new SetParams();
params.px(LOCK_TIME); //加锁时间1s
params.nx();
synchronized (this){
if ((ownerThread==null)&&
"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {
lockerId.set(id);
setOwnerThread(t);
if(expireThread == null){//看门狗线程启动
expireThread = new Thread(new ExpireTask(),"expireThread");
expireThread.setDaemon(true);
expireThread.start();
}
//往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)
delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));
System.out.println(Thread.currentThread().getName()+"已获得锁----");
return true;
}else{
System.out.println(Thread.currentThread().getName()+"无法获得锁----");
return false;
}
}
} catch (Exception e) {
throw new RuntimeException("分布式锁尝试加锁失败!",e);
} finally {
jedis.close();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("不支持等待尝试获取锁!");
}
@Override
public void unlock() {
if(ownerThread!=Thread.currentThread()) {
throw new RuntimeException("试图释放无所有权的锁!");
}
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockName),
Arrays.asList(lockerId.get()));
System.out.println(result);
if(result.longValue()!=0L){
System.out.println("Redis上的锁已释放!");
}else{
System.out.println("Redis上的锁释放失败!");
}
} catch (Exception e) {
throw new RuntimeException("释放锁失败!",e);
} finally {
if(jedis!=null) jedis.close();
lockerId.remove();
setOwnerThread(null);
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("不支持等待通知操作!");
}
/*看门狗线程*/
private Thread expireThread;
//通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数 阻塞延迟队列 刷1 没有刷2
private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();
//续锁逻辑:判断是持有锁的线程才能续锁
private final static String DELAY_LOCK_LUA =
"if redis.call('get',KEYS[1])==ARGV[1] then\n" +
" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +
" else return 0 end";
private class ExpireTask implements Runnable{
@Override
public void run() {
System.out.println("看门狗线程已启动......");
while(!Thread.currentThread().isInterrupted()) {
try {
LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到 0.9s
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
Long result = (Long)jedis.eval(DELAY_LOCK_LUA,
Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),
Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));
if(result.longValue()==0L){
System.out.println("Redis上的锁已释放,无需续期!");
}else{
delayDog.add(new ItemVo<>((int)LOCK_TIME,
new LockItem(lockItem.getKey(),lockItem.getValue())));
System.out.println("Redis上的锁已续期:"+LOCK_TIME);
}
} catch (Exception e) {
throw new RuntimeException("锁续期失败!",e);
} finally {
if(jedis!=null) jedis.close();
}
} catch (InterruptedException e) {
System.out.println("看门狗线程被中断");
break;
}
}
System.out.println("看门狗线程准备关闭......");
}
}
// @PostConstruct
// public void initExpireThread(){
//
// }
@PreDestroy
public void closeExpireThread(){
if(null!=expireThread){
expireThread.interrupt();
}
}
}
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.12.3version>
dependency>
package com.mashibing.redission;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRedissionConfig {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private String port;
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson(){
//1、创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://"+host+":"+port);
//2、根据config配置创建出RedissonClient的实例
RedissonClient redissonClient= Redisson.create(config);
return redissonClient;
}
}
package com.mashibing.service;
import com.mashibing.dao.ShopGoodsMapper;
import com.mashibing.dao.shopGoodsUniqueMapper;
import com.mashibing.lock.RedisDistLock;
import com.mashibing.model.ShopGoods;
import com.mashibing.model.shopGoodsUnique;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
@Service
@Transactional
public class GoodsServiceRedisImpl {
private static final Logger logger = LoggerFactory.getLogger(GoodsServiceRedisImpl.class);
@Autowired
private ShopGoodsMapper shopGoodsMapper;
@Autowired
private shopGoodsUniqueMapper shopGoodsUniqueMapper;
//Redis分布式锁
@Autowired
private RedisDistLock redisDistLock;
@Autowired
private RedissonClient redissonClient;
//Zookeeper分布式锁
@Autowired
private CuratorFramework curatorFramework;
// static InterProcessLock lock ;
//多个应用,分布式部署,这里需要使用分布式锁类解决问题
public int updateGoods(long orderId, long goodsId, int goodsNumber){
// synchronized (this){
// if(lock ==null){ //多线程安全问题
// lock = new InterProcessMutex(curatorFramework,"/locks2");
// }
// }
int ireturn =-1;
RLock lock = redissonClient.getLock("RD-LOCK");
lock.lock(1, TimeUnit.SECONDS);
try{
//redisson实现分布式锁
// redisDistLock.lock();
// zookeeper实现的分布式锁(临时、序号节点,包括监听机制)
// if(lock.acquire(5, TimeUnit.SECONDS)){
ShopGoods shopGoods =shopGoodsMapper.selectByPrimaryKey(goodsId);
Integer goodnumber = shopGoods.getGoodsNumber()-goodsNumber;
shopGoods.setGoodsNumber(goodnumber);
if(shopGoodsMapper.updateByPrimaryKey(shopGoods)>=0){
//logger.info("修改库存成功:[" + orderId + "]");
ireturn =1;
}else{
logger.error("修改库存失败:[" + orderId + "]");
ireturn =-1;
}
// }else {
// logger.error("修改库存失败:[拿不到zk分布式锁]");
// ireturn =-1;
// }
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
// redisDistLock.unlock();
// try {
// lock.release(); //释放锁
// } catch (Exception e) {
// e.printStackTrace();
// }
return ireturn;
}
}
//回退-扣减库存
public int CancelupdateGoods(long orderId, long goodsId, int goodsNumber){
try {
//去重表中有,才能证明是插入了,所以要回退
shopGoodsUnique shopGoodsUnique = new shopGoodsUnique();
shopGoodsUnique.setOrderId(orderId);
shopGoodsUniqueMapper.insert(shopGoodsUnique);
}catch (Exception e) {
ShopGoods shopGoods =shopGoodsMapper.selectByPrimaryKey(goodsId);
Integer goodnumber = shopGoods.getGoodsNumber()+goodsNumber;
shopGoods.setGoodsNumber(goodnumber);
if(shopGoodsMapper.updateByPrimaryKey(shopGoods)>=0){
//logger.info("修改库存成功:[" + orderId + "]");
return 1;
}else{
logger.error("回退库存失败:[" + orderId + "]");
return -1;
}
}
return 1;
}
}
原理:创建临时节点(就是基于客户端与服务端的session,断开就会删除),第一个线程进来获得锁,第二个线程利用watch机制监听第一个线程,第一个线程结束就获得锁,以此类推
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>5.2.1version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>5.2.1version>
dependency>
<dependency>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
<version>3.8.0version>
dependency>
package com.mashibing.lock.zk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CuratorConfig {
@Autowired
WrapperZK wrapperZK;
//这里返回一个zk的客户端的bean
@Bean
public CuratorFramework curatorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(wrapperZK.getElapsedTimeMs(), wrapperZK.getRetryCount());
CuratorFramework client= CuratorFrameworkFactory.newClient(wrapperZK.getConnectString(),
wrapperZK.getSessionTimeoutMs(),
wrapperZK.getConnectionTimeoutMs(),
retryPolicy);
client.start();
return client;
}
}
package com.mashibing.lock.zk;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZK {
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
}
package com.mashibing.service;
import com.mashibing.dao.ShopGoodsMapper;
import com.mashibing.dao.shopGoodsUniqueMapper;
import com.mashibing.lock.RedisDistLock;
import com.mashibing.lock.rdl.RedisDistLockWithDog;
import com.mashibing.model.ShopGoods;
import com.mashibing.model.shopGoodsUnique;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
/**
*@author 李瑾老师
*
*类说明:订单相关的服务
*/
@Service
@Transactional
public class GoodsServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(GoodsServiceImpl.class);
@Autowired
private ShopGoodsMapper shopGoodsMapper;
@Autowired
private shopGoodsUniqueMapper shopGoodsUniqueMapper;
//Redis分布式锁
@Autowired
private RedisDistLock redisDistLock;
//Zookeeper分布式锁
@Autowired
private CuratorFramework curatorFramework;
static InterProcessLock lock ;
//多个应用,分布式部署,这里需要使用分布式锁类解决问题
public int updateGoods(long orderId, long goodsId, int goodsNumber){
synchronized (this){
if(lock ==null){ //多线程安全问题
lock = new InterProcessMutex(curatorFramework,"/locks2");
}
}
int ireturn =-1;
try{
//redisDistLock.lock();
//zookeeper实现的分布式锁(临时、序号节点,包括监听机制)
if(lock.acquire(5, TimeUnit.SECONDS)){
ShopGoods shopGoods =shopGoodsMapper.selectByPrimaryKey(goodsId);
Integer goodnumber = shopGoods.getGoodsNumber()-goodsNumber;
shopGoods.setGoodsNumber(goodnumber);
if(shopGoodsMapper.updateByPrimaryKey(shopGoods)>=0){
//logger.info("修改库存成功:[" + orderId + "]");
ireturn =1;
}else{
logger.error("修改库存失败:[" + orderId + "]");
ireturn =-1;
}
}else {
logger.error("修改库存失败:[拿不到zk分布式锁]");
ireturn =-1;
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//redisDistLock.unlock();
try {
lock.release(); //释放锁
} catch (Exception e) {
e.printStackTrace();
}
return ireturn;
}
}
//回退-扣减库存
public int CancelupdateGoods(long orderId, long goodsId, int goodsNumber){
try {
//去重表中有,才能证明是插入了,所以要回退
shopGoodsUnique shopGoodsUnique = new shopGoodsUnique();
shopGoodsUnique.setOrderId(orderId);
shopGoodsUniqueMapper.insert(shopGoodsUnique);
}catch (Exception e) {
ShopGoods shopGoods =shopGoodsMapper.selectByPrimaryKey(goodsId);
Integer goodnumber = shopGoods.getGoodsNumber()+goodsNumber;
shopGoods.setGoodsNumber(goodnumber);
if(shopGoodsMapper.updateByPrimaryKey(shopGoods)>=0){
//logger.info("修改库存成功:[" + orderId + "]");
return 1;
}else{
logger.error("回退库存失败:[" + orderId + "]");
return -1;
}
}
return 1;
}
}