大家好,欢迎来到IT知识分享网。
1.1 互斥
在分布式高并发的条件下,需要保证,同一时刻只能有一个线程获得锁。
1.2 防止死锁
在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。
所以分布式非常有必要设置锁的有效时间
,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
1.3 性能
对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。
所以在锁的设计时,需要考虑两点。
-
1、
锁的颗粒度要尽量小
。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
-
2、锁的范围尽量要小`。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。
1.4 可重入
ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。
2:Redission的原理分析
2.1 加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
2.2 watch dog自动延期机制
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
2.3 为啥要用lua脚本
主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
在分布式锁中,加锁的操作需要多条命令,使用lua脚本保证了原子性。
2.4 可重入加锁机制
Redisson可以实现可重入加锁机制基于以下两点进行实现:
1、Redis存储锁的数据类型是 Hash类型
2、Hash数据类型的key值包含了当前线程信息。
使用的是数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>>
类型,这里key是指 加锁的key。例如:key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是:
guid + 当前线程的ID。后面的value就是重入的次数。
3:Redis分布式锁的缺点
(1):如果使用的是单Master节点,那么可能会因为主备节点切换时候,锁数据没有同步完成就出现一次加锁,可能出现问题。
4:代码分析
4.1 RLock接口
其中继承的接口Lock,是JUC中的接口,具有Lock中的能力。同时它还有很多新特性:强制锁释放,带有效期的锁。
public interface RLock extends Lock, RLockAsync
public interface RRLock {
//----------------------Lock接口方法-----------------------
/**
* 加锁 锁的有效期默认30秒
*/
void lock();
/**
* tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
*
* @param time 等待时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 解锁
*/
void unlock();
/**
* 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
* Thread.currentThread().interrupt(); 方法真正中断该线程
*/
void lockInterruptibly();
//----------------------RLock接口方法-----------------------
/**
* 加锁 上面是默认30秒这里可以手动设置锁的有效时间
*
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* 这里比上面多一个参数,多添加一个锁的有效时间
*
* @param waitTime 等待时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 检验该锁是否被线程使用,如果被使用返回True
*/
boolean isLocked();
/**
* 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
* 这个比上面那个实用
*/
boolean isHeldByCurrentThread();
/**
* 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lockInterruptibly(long leaseTime, TimeUnit unit);
}
4.2:RLock实现类RedissonLock
public class RedissonLock extends RedissonExpirable implements RLock
4.2.1 void lock()方法
-
1:RedissionLock的Lock方法
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
-
2:Lock带参数
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//1:开始进行尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//2:订阅锁的释放信息
RFuture<RedissonLockEntry> future = subscribe(threadId);
//3:等待锁的释放
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
//4:循环多次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//5:获取锁成功,返回
if (ttl == null) {
break;
}
// waiting for message
//6:休眠等待锁被释放的信号,释放被唤醒
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//7:退订锁的信息
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
-
3:tryAcquire
异步尝试进行加锁,尝试加锁的时候leaseTime为-1。通常如果客户端没有加锁成功,则会进行阻塞,leaseTime为锁释放的时间。
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
//使用get方法,获取RFuture数据,没有返回则阻塞
return get(tryAcquireAsync(leaseTime, unit, threadId));
}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//进行加锁,返回RFuture
if (leaseTime != -1) {
//异步进行获取锁的操作
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//进行看门狗的处理,见4.2.3
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}生成lua脚本:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述lua脚本:
总结:
-
1:加锁,使用Lock没有携带过期时间,但是会使用一个默认值: lockWatchdogTimeout = 30 * 1000;
-
2:和ReentranLock一样,在进行获取锁的时候,首先会先尝试一次获取锁的操作,如果获取锁失败使用信号量的方式,订阅锁的释放信息,有了释放锁的信息,则进行尝试加锁,依此循环。
-
3:根据锁释放的逻辑,锁释放的时候会发布锁解除的消息,应该在2中订阅对应,在释放锁的时候释放锁的信息。
4.2.2 解锁
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
-
1:unlockAsync方法
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//核心的解锁流程
RFuture<Boolean> future = unlockInnerAsync(threadId);
//后取是解除看门狗进程
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
-
2:解锁unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
/**
*(1):判断key是否存在,不存在即释放成功;
*(2):减少锁定值1,减去后如果小于等于0,则释放锁,并且进行事件通知锁释放。
*/
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
4.2.3 看门狗实现
在方法:tryAcquireAsync中尝试加锁之后,会启动定时任务进行看门狗的进程
entryName:锁的唯一标记
entry:线程ID信息的包装类
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//在EXPIRATION_RENEWAL_MAP中添加entryNme和entry的数据
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//启动一个开门狗的线程
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//在internalLockLeaseTime / 3时间后执行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//看门狗的核心方法
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
//命令执行成功之后,再循环调用
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
//使用lua脚本进行锁的延长
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
5:RedLock
RedLock是基于redis实现的分布式锁,它能够保证以下特性:
- 互斥性:在任何时候,只能有一个客户端能够持有锁;避免死锁:
- 当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
- 容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁; RedLock算法思想,意思是不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。
Redisson中有一个MultiLock
的概念,可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。而Redisson中实现RedLock就是基于MultiLock
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/30737.html