1.xdb中的使用例子
在xdb中,我们大概执行业务时的流程简化如下:
- package org.example.testReentrantLock;
-
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.ReentrantLock;
-
- public class Main {
- public static final ReentrantLock lock = new ReentrantLock();
-
- public static void main(String[] args) throws InterruptedException {
- new Thread(() -> {
-
- try {
- lock.lock();
-
- // 这里相当于执行业务逻辑,在事务内再次持有一次锁,加入到自己本地的LockList中
- lock.lock();
- System.out.println("1111");
- TimeUnit.SECONDS.sleep(2);
- lock.unlock();
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }, "a").start();
-
- TimeUnit.SECONDS.sleep(1);
-
- new Thread(() -> {
- try {
- lock.lock();
-
- System.out.println("2222");
-
- } finally {
- lock.unlock();
- }
-
- }, "b").start();
-
- }
- }
-
- /*
- 1111
- 2222
- */
思考:为啥我们用ReentrankLock而不是synchronized呢?因为:我们需要提供可重入的特性,而且释放是要控制在:事务完成后(不管是回滚了还是提交成功了)才释放锁,因此要使用这种显式锁。
2.源码分析
step1
public static final ReentrantLock lock = new ReentrantLock();
实现如下:
- public ReentrantLock() {
- sync = new NonfairSync();
- }
可以看出来,默认情况下,是非公平锁,想想目前我还没有遇到要用公平锁的情况。
step2
接下来是上锁:
lock.lock();
-
- /**
- * Acquires the lock.
- 获取锁
- *
- *
Acquires the lock if it is not held by another thread and returns
- * immediately, setting the lock hold count to one.
- 如果这个锁没有被其它线程持有,那么会立即获取锁,并且设置锁的持有次数为1
- *
- *
If the current thread already holds the lock then the hold
- * count is incremented by one and the method returns immediately.
- 如果当前线程已经持有锁,那么就会增加锁的持有数量,然后立即返回
- *
- *
If the lock is held by another thread then the
- * current thread becomes disabled for thread scheduling
- * purposes and lies dormant until the lock has been acquired,
- * at which time the lock hold count is set to one.
- 如果此锁被其它线程持有了,那么此线程立即变成不可用然后进入睡眠状态,直到锁被获取到
- 那时,锁的持有数量被设置为1
- */
- public void lock() {
- sync.acquire(1);
- }
进入到acquire里面:// && 符号的使用,前面为true才会走后门
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
-
- // 这个很重要,如果没有获取到,也就是返回了false,那么肯定要检查阻塞
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
这里走非公平下的获取:
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = 7316153563782823691L;
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- }
再次进入看下非公平模式下如何获取锁的:
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = -5179523762034025860L;
-
- /**
- * Performs non-fair tryLock. tryAcquire is implemented in
- * subclasses, but both need nonfair try for trylock method.
- */
- @ReservedStackAccess
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
-
- // 查看下当前的状态,这个state是加了volatile的,volatile int state;
- int c = getState();
-
- // 根据此状态位判断出来,如果没有被其它线程持有
- if (c == 0) {
- // 使用cas,如果当前是0,则设置为1
- if (compareAndSetState(0, acquires)) {
- // 设置当前线程占有了
- setExclusiveOwnerThread(current);
- return true;
- }
- }
-
- // 如果不是0,而且就是这个线程持有的
- else if (current == getExclusiveOwnerThread()) {
- // 计数+1
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
-
- // 设置下最新的计数
- setState(nextc);
- return true;
- }
-
-
- // 走到这,说明其它线程已经持有了(state既不是0,也不是自己占有)
- return false;
- }
既然没有获取到,那么肯定就要阻塞自己了
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
如何实现阻塞的呢?进去 // 可以发现aqs在没有获取到的时候,其实是先cas死心不改尝试获取下
-
- /*
- * Various flavors of acquire, varying in exclusive/shared and
- * control modes. Each is mostly the same, but annoyingly
- * different. Only a little bit of factoring is possible due to
- * interactions of exception mechanics (including ensuring that we
- * cancel if tryAcquire throws exception) and other control, at
- * least not without hurting performance too much.
- */
- /**
- * Acquires in exclusive uninterruptible mode for thread already in
- * queue. Used by condition wait methods as well as acquire.
- *
- * @param node the node
- * @param arg the acquire argument
- * @return {@code true} if interrupted while waiting
- */
- final boolean acquireQueued(final Node node, int arg) {
- boolean interrupted = false;
- try {
- for (;;) {
- // 可以看出来这里是aqs相对于synchronized的优化,没获取到暂时不死心
- // 这里依然再走一下尝试获取
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- // 走这里,说明还真获取到了
- setHead(node);
- p.next = null; // help GC
- return interrupted;
- }
-
- // 走到这说明真是获取不到了,那么应该是进入阻塞状态了
- if (shouldParkAfterFailedAcquire(p, node))
- // 这里可以看出来,是调用的park实现的阻塞
- interrupted |= parkAndCheckInterrupt();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- if (interrupted)
- selfInterrupt();
- throw t;
- }
- }
真的没办法了,阻塞自己呗
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
park
- public static void park(Object blocker) {
- // 要被阻塞的线程
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- // 这里是真的阻塞了,是一个native方法
- U.park(false, 0L);
-
- setBlocker(t, null);
- }