ReentrantLock的lock分析

lock方法入口非常简单只有一句,即调用Sync类的lock方法。

1
2
3
4
private final Sync sync;
public void lock() {
sync.lock();
}

Sync类是一个AQS子类实现,lock方法是以独占锁模式获取锁。

1
2
3
4
5
6
7
//非公平锁模式
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

1
2
3
4
//公平锁模式
final void lock() {
acquire(1);
}

一句话带过两者区别,非公平锁会在加锁的第一步直接尝试一次锁标识替换,如果加锁成功则设置所属线程。
重点就是acquire这个方法,下面会拆解每一步来解析。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

!tryAcquire

含义:尝试以独占模式获取锁,这一步会先进行加锁,加锁成功取反就是false,直接退出,加锁失败则继续尝试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

getState() 不再详述,返回的是当前锁的标识位,c == 0代表还未加锁。

①if (c == 0)这部分的含义是:当前没有被锁定,进行加锁。

!hasQueuedPredecessors()

hasQueuedPredecessors() 用来确定是否需要排队,用取反操作符操作,当操作命中不需要排队的情况时,这部分为true。

1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
h != t

h(队列首)和t(队列尾)是否相等,分为以下情况。

  1. h == t 而且h和t都是null,队列没有初始化,这时候是第一次加锁场景,此时不需要排队。
  2. h == t 队列正在初始化,有一个虚拟节点同时作为首尾节点,这是另一个加锁请求发起的,但是还没有排队完成,此次加锁可以直接加锁,不需要排队。
  3. h == t 队尾拿到了锁正在执行,Node里prev、next和thread都是null,此时可以强行发起加锁。
  4. h != t 此时已经有了等待队列,可以进入下一个逻辑判断。
(s = h.next) == null

h(队列首)包含两种可能性:

  1. 持有锁的节点
  2. 虚拟节点
    h节点本身不参与排队,h.next是第一个排队节点,(s = h.next) == null代表没有人排队,此次加锁可以直接加锁,不需要排队。
    s.thread != Thread.currentThread()
    如果持有锁的线程就是当前线程,则作为锁重入/自旋处理,不需要排队。

    compareAndSetState(0, acquires)

    尝试加锁,使用CAS操作将锁的标识位改变。

    经过这两步之后,已经获取到了锁资源,并且修改了锁标识位,调用setExclusiveOwnerThread(current);
    将当前线程记录在lock中,标记为当前持有锁的线程。

②else if (current == getExclusiveOwnerThread())这部分的含义是:当前持有锁的线程和本次线程是相同的,作为重入/自旋锁处理。

重入/自旋锁的逻辑很简单,将偏移量进行累加,检查是否超过上限Integer.MAX_VALUE,超过时候nextc < 0,抛出异常。
没有问题的话将新的锁标识位设置到该锁对象中。

③默认返回fasle:代表当前已经被其他线程锁定锁定。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

走到这里的时候加锁已经失败了,需要进行排队,入队之后使用自旋的方式再次尝试加锁

addWaiter(Node.EXCLUSIVE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

首先将线程信息保存在节点中,检查尾部节点是否为null。
①尾节点为null:两种情况,一种是队列没有初始化,一种是队列正在初始化,首节点已经初始化完成但尾节点还未初始化;这两种情况都需要去协助初始化队列。
②尾节点不为null:已经初始化过等待队列,可以直接排队,将当前需要入队的节点的前置节点设为当前尾结点,设置当前需要入队的节点为新尾结点,
设置原来的尾节点后置节点为目前的尾结点。

enq(node);

入队,并在需要的时候协助初始化队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

这是一个死循环,以尾节点为判断来源,没有尾节点的情况下必须进行队列初始化,用一个虚拟节点new Node() 来替换首节点。
并将首尾节点设置为同一个节点,进入下次循环。

有尾结点的情况,也分两种可能,即节点协助了初始化,进入第二次for循环;或者初始化首节点,但是尾结点还没有指向首节点的瞬间,其他线程也看到尾节点是null走了进来协助初始化。
此时是看谁比较幸运,都会进行CAS替换,先设置节点,再设置节点关系。因为是死循环,所以都会排上。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

自旋进行加锁,定义了两个局部变量,failed表示是否失败,interrupted表示是否休眠线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

①第一个if (p == head && tryAcquire(arg))

p == head代表当前节点的前置节点是首节点,首节点是正在持有锁的节点,不等他唤醒自己,不断的调用tryAcquire(arg)来尝试加锁,争分夺秒。

加锁成功,根据定义,自己作为持有锁的节点就要设置为head,并将自己的线程信息和前置节点清空。加锁成功,局部变量failed改为false没有失败。
②if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

shouldParkAfterFailedAcquire(p, node)

加锁失败后是否需要阻断线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

节点状态一共有四种

1
2
3
4
5
6
7
8
//取消
static final int CANCELLED = 1;
//等待触发条件,前节点可能是head,也可能是取消状态
static final int SIGNAL = -1;
//等待,在等待队列中
static final int CONDITION = -2;
//向后传播
static final int PROPAGATE = -3;

①ws == Node.SIGNAL,即前置节点状态是-1,前置节点正在释放锁,所以当前节点要堵塞。

②ws > 0,前置节点已经取消,使用死循环将取消节点一个个清除,返回上一个节点,并进入下次循环。

③else,前置节点还在排队,修改节点状态为Node.SIGNAL,并进入下次循环。

parkAndCheckInterrupt()

上一个方法返回true的时候,代表需要阻塞线程,而这个方法就是调用堵塞线程并返回当前线程是否堵塞成功。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

如果这个方法返回true,则线程已经被打断,局部变量interrupted = true;

Thread.interrupted()用来检查用户设置的线程打断状态。这里会让用户设置的状态取反。

cancelAcquire(node);

如果加锁失败则在退出前触发,取消预留的资源,也就是节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

  1. 先将线程信息指向null;
  2. 向前寻找,将取消的节点从队列中移除,获取前置节点。
  3. 记录前置节点的后续节点,因为该方法是并发的,可能会有两个线程同时进入该方法并使用用一个前置节点,记录这个后续节点用于CAS操作避免并发错误。
  4. 设置当前节点为取消状态
  5. 移除节点

移除节点的代码就是下面的两层if else语句,分别代表两种可能:

① 当前需要取消的节点是队列尾

1.1 node == tail用于判断当前需要取消的节点是队列尾

1.2 compareAndSetTail(node, pred)使用cas操作将前置节点设置为队列尾

1.3 compareAndSetNext(pred, predNext, null)将新的队列尾next设置为null(前面提到放置并发,cas需要用前置节点的后续节点)。

② 当前需要取消的节点不是队列尾

2.1.1 pred != head 不是队列首

2.1.2 (ws = pred.waitStatus) == Node.SIGNAL前置节点的状态是等待触发。

2.1.3 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) 前置节点没有取消,并且CAS到等待触发状态。

2.1.4 pred.thread != null 不是第一个排队的节点。

2.2.1 符合1、4条件以及2、3其中一条时,将自己从队列中移除。compareAndSetNext(pred, predNext, next);将自己的前置节点和后置节点关联起来。

2.2 队列首;第一个排队的节点;节点状态不是并且无法修改到等待触发,唤醒下一个节点的线程

unparkSuccessor(Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

需要换线的对象是node的下一个节点。分为两个if:

①从当前节点往后找,找到一个没有被取消的节点

②唤醒这个线程

selfInterrupt()

还原用户设置的线程中断状态,因为方法返回值是void,所以这个代码运不运行只取决于用户有没有自定义线程中断状态。

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

之前说过在死循环中,会检查线程打断状态,但是jdk api会让状态翻转,这里再次调用保证状态正确