並發編程之AQS(AbstractQueuedSynchronizer)
AbstractQueuedSynchronizer,簡稱AQS。AQS定義了一個抽象的隊列來進行同步操作,很多同步類都依賴於它,例如常用的ReentrantLock/Semaphore/CountDownLatch等
每個node維護了一份volatile int state(代表共享狀態)和一個FIFO線程隊列(多線程爭用資源阻塞時進入該隊列),AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
//嘗試獲取獨佔模式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//嘗試釋放獨佔模式
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享式獲取同步狀態
//返回負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享式釋放同步狀態;如果釋放後允許喚醒後續等待結點返回true,否則返回false。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//當前同步器是否在獨佔模式下被線程佔用,一般該方法表示是否被當前線程所獨佔;只有用到condition才需要去實現它。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程如果獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構造成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。
在CLH同步隊列中,一個節點表示一個線程,它保存著線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義如下:
static final class Node {
//共享模式
static final Node SHARED = new Node();
//獨佔模式
static final Node EXCLUSIVE = null;
//因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
static final int CANCELLED = 1;
//後繼節點的線程處於等待狀態,而當前節點的線程如果釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行
static final int SIGNAL = -1;
//節點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
static final int CONDITION = -2;
//表示下一次共享式同步狀態獲取將會無條件地傳播下去
static final int PROPAGATE = -3;
//等待狀態
volatile int waitStatus;
//前驅節點
volatile Node prev;
//後繼節點
volatile Node next;
//當前節點的線程
volatile Thread thread;
}
獨佔模式acquire方法
該方法以獨模式獲取共享資源。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。ReentrantLock的lock方法就是調用的該方法來獲取鎖。
方法的執行流程如下:
- 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回。
- 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記為獨佔模式。
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt()。
/**
* 獨佔模式獲取同步狀態,如果當前線程獲取同步狀態成功,則直接返回,否則
* 將會進入同步隊列等待,該方法會調用實現類重寫的tryAcquire(int arg)方法
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法
doc翻譯:嘗試以獨佔模式獲取。 如果對象的狀態允許以獨佔模式獲取它,則此方法應查詢,如果是,則獲取它。
執行acquire的線程始終調用此方法。 如果此方法報告失敗,則獲取方法可以對線程進行排隊(如果它尚未排隊),直到它通過某個其他線程的釋放來發出信號。 這可用於實現方法{@link Lock#tryLock()}。
自我理解:這個方法是需要實現類進行重寫的,用於對資源的獲取和釋放。至於能不能重入,能不能加鎖,那就看具體的自定義同步器怎麼去設計了。當然,自定義同步器在進行資源訪問時要考慮線程安全的影響。
addWaiter方法doc翻譯:為當前線程和給定模式創建並排隊節點。
自我理解:CLH隊列入列無非就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。代碼我們可以看看addWaiter(Node node)方法
/**
* 將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 首先嘗試在鏈表的後面快速添加節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 將該節點添加到隊列尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果首節點為空或者cas添加失敗,則進入enq方法通過自旋方式入隊列,確保一定成功,這是一個保底機制
enq(node);
return node;
}
enq方法
doc翻譯:將節點插入隊列,必要時進行初始化
自我理解:addWaiter(Node node)先通過快速嘗試設置尾節點,如果失敗,則調用enq(Node node)方法設置尾節點。在enq(Node node)方法中,AQS通過自旋鎖的方式來保證節點可以正確添加,只有成功添加後,當前線程才會從該方法返回,否則會一直執行下去
/**
* 將node加入隊尾
*/
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
// 當前沒有節點,構造一個new Node(),將head和tail指向它
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 當前有節點,將傳入的Node放在鏈表的最後
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
doc翻譯:對於已經在隊列中的線程,以獨佔不間斷模式獲取。 由條件等待方法使用以及獲取。
自我理解:通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。下一步需要處理的是:進入等待狀態休息,直到其他線程徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去干自己想乾的事了。其實就是個排隊拿號,在等待隊列中排隊拿號,直到拿到號後再返回
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 標記等待過程中是否被中斷過
for (;;) {
final Node p = node.predecessor(); // node的前一個節點
// 如果前一個節點是head,說明當前node節點是第二個節點,接著嘗試去獲取資源
// 可能是head釋放完資源喚醒自己的,當然也可能被interrupt了
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 返回等待過程中是否被中斷過
}
// 如果自己可以休息了,就進入waiting狀態,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記為true
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法doc翻譯:檢查並更新無法獲取的節點的狀態。 如果線程應該阻塞,則返回true。 這是所有獲取循環中的主要信號控制。 需要pred == node.prev。
自我理解: 此方法主要用於檢查狀態,看看自己是否真的可以去休息了
- 1.如果pred的waitStatus是SIGNAL,直接返回true
- 2.如果pred的waitStatus>0,也就是CANCELLED,向前一直找到<=0的節點,讓節點的next指向node
- 3.如果pred的waitStatus<=0,改成SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果已經告訴前驅拿完號後通知自己一下,那就可以一邊玩蛋去了
return true;
if (ws > 0) {
/*
* 如果前節點放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
* 注意:那些放棄的結點,由於被自己「加塞」到它們前邊,它們相當於形成一個無引用鏈,稍後就會被GC回收
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前節點正常,那就把前節點的狀態設置成SIGNAL,告訴它拿完號後通知下。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt方法
/**
* 讓線程去休息,真正進入等待狀態
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 調用park()使線程進入waiting狀態
return Thread.interrupted(); // 如果被喚醒,查看是否被中斷(該方法會重置標識位)
}
acquireQueued總共做了3件事:
- 結點進入隊尾後,檢查狀態。
- 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒自己。
- 被喚醒後,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
上一張流程圖看看吧
此方法是獨佔模式下線程釋放資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源
/**
* 釋放資源
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒等待隊列里的下一個線程
return true;
}
return false;
}
tryRelease方法
跟tryAcquire()一樣,這個方法是需要獨佔模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨佔模式,該線程來釋放資源,那麼它肯定已經拿到獨佔資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
unparkSuccessor方法private void unparkSuccessor(Node node) {
// 這裡,node一般為當前線程所在的結點。
int ws = node.waitStatus;
if (ws < 0) // 置零當前線程所在的結點狀態,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
// 找到下一個需要喚醒的結點s
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒
}
總結一下
在AQS中維護著一個FIFO的同步隊列,當線程獲取同步狀態失敗後,則會加入到這個CLH同步隊列的對尾並一直保持著自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
共享模式acquireShared方法doc翻譯:以共享模式獲取,忽略中斷。 通過首先調用{@link #tryAcquireShared}來實現,成功返回。 否則線程排隊,可能反覆阻塞和解除阻塞,調用{@link #tryAcquireShared}直到成功。
簡單點說就是這個方法會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared方法
tryAcquireShared()依然需要自定義實現類去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他線程還可以去獲取。
//共享式獲取同步狀態
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
doAcquireShared方法
doc翻譯:以共享不間斷模式獲取
此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) {
//隊列尾部添加共享模式的節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//獲取上一個節點,如果上一個節點時head,嘗試獲取資源
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);//成功有剩餘資源,將head指向自己,喚醒之後的線程
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
設置隊列頭,並檢查後繼者是否在共享模式下等待,如果是傳播,如果傳播> 0或PROPAGATE狀態已設置。
這個方法除了重新標記head指向的節點外,還有一個重要的作用,那就是propagate(傳遞),
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don"t know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared方法
共享模式的釋放操作 - 發出後續信號並確保傳播。 (注意:對於獨佔模式,如果需要信號,只需調用數量來調用head的unparkSuccessor。)
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
acquireShared總結
- tryAcquireShared()嘗試獲取資源,成功則直接返回。
- doAcquireShared()會將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己。它還會嘗試著讓喚醒傳遞到後面的節點。
releaseShared方法
以共享模式發布。, 如果{@link #tryReleaseShared}返回true,則通過解除阻塞一個或多個線程來實現。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
TAG:程序員小新人學習 |