JDK源碼閱讀:InterruptibleChannel 與可中斷 IO
(點擊
上方公眾號
,可快速關注)
來源:木杉的博客 ,
imushan.com/2018/08/01/java/language/JDK源碼閱讀-InterruptibleChannel與可中斷IO/
Java傳統IO是不支持中斷的,所以如果代碼在read/write等操作阻塞的話,是無法被中斷的。這就無法和Thead的interrupt模型配合使用了。JavaNIO眾多的升級點中就包含了IO操作對中斷的支持。InterruptiableChannel表示支持中斷的Channel。我們常用的FileChannel,SocketChannel,DatagramChannel都實現了這個介面。
InterruptibleChannel介面
public interface InterruptibleChannel extends Channel
{
/**
* 關閉當前Channel
*
* 任何當前阻塞在當前channel執行的IO操作上的線程,都會收到一個AsynchronousCloseException異常
*/
public void close() throws IOException;
}
InterruptibleChannel介面沒有定義任何方法,其中的close方法是父介面就有的,這裡只是添加了額外的注釋。
AbstractInterruptibleChannel實現了InterruptibleChannel介面,並提供了實現可中斷IO機制的重要的方法,比如begin(),end()。
在解讀這些方法的代碼前,先了解一下NIO中,支持中斷的Channel代碼是如何編寫的。
第一個要求是要正確使用begin()和end()方法:
boolean completed = false;
try {
begin();
completed = ...; // 執行阻塞IO操作
return ...; // 返回結果
} finally {
end(completed);
}
NIO規定了,在阻塞IO的語句前後,需要調用begin()和end()方法,為了保證end()方法一定被調用,要求放在finally語句塊中。
第二個要求是Channel需要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會調用這個方法,使用Channel的具體實現來關閉Channel。
接下來我們具體看一下begin()和end()方法是如何實現的。
begin方法
// 保存中斷處理對象實例
private Interruptible interruptor;
// 保存被中斷線程實例
private volatile Thread interrupted;
protected final void begin() {
// 初始化中斷處理對象,中斷處理對象提供了中斷處理回調
// 中斷處理回調登記被中斷的線程,然後調用implCloseChannel方法,關閉Channel
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
// 如果當前Channel已經關閉,則直接返回
if (!open)
return;
// 設置標誌位,同時登記被中斷的線程
open = false;
interrupted = target;
try {
// 調用具體的Channel實現關閉Channel
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
// 登記中斷處理對象到當前線程
blockedOn(interruptor);
// 判斷當前線程是否已經被中斷,如果已經被中斷,可能登記的中斷處理對象沒有被執行,這裡手動執行一下
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
從begin()方法中,我們可以看出NIO實現可中斷IO操作的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理對象,這樣Thread對象在被中斷時,會執行中斷處理對象中的回調,這個回調中,執行關閉Channel的操作。這樣就實現了Channel對線程中斷的響應了。
接下來重點就是研究「Thread添加中斷處理邏輯」這個機制是如何實現的了,是通過blockedOn方法實現的:
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);
}
blockedOn方法使用的是JavaLangAccess的blockedOn方法。
SharedSecrets是一個神奇而糟糕的類,為啥說是糟糕呢,因為這個方法的存在,就是為了訪問JDK類庫中一些因為類作用域限制而外部無法訪問的類或者方法。JDK很多類與方法是私有或者包級別私有的,外部是無法訪問的,但是JDK在本身實現的時候又存在互相依賴的情況,所以為了外部可以不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各種超越限制的方法。
SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess對象。JavaLangAccess對象就和名稱所說的一樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:
// java.lang.System#setJavaLangAccess
private static void setJavaLangAccess() {
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
//...
});
}
可以看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:
/* The object in which this thread is blocked in an interruptible I/O
* operation, if any. The blocker"s interrupt method should be invoked
* after setting this thread"s interrupt status.
*/
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
*/
void blockedOn(Interruptible b) {
// 串列化blocker相關操作
synchronized (blockerLock) {
blocker = b;
}
}
而這個方法也非常簡單,就是設置java.lang.Thread#blocker變數為之前提到的中斷處理對象。而且從注釋中可以看出,這個方法就是專門為NIO設計的,注釋都非常直白的提到了,NIO的代碼會通過sun.misc.SharedSecrets調用到這個方法。。
接下來就是重頭戲了,看一下Thread在中斷時,如何調用NIO註冊的中斷處理器:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
// 如果NIO設置了中斷處理器,則只需Thread本身的中斷邏輯後,調用中斷處理器的回調函數
if (b != null) {
interrupt0(); // 這一步會設置interrupt標誌位
b.interrupt(this);
return;
}
}
// 如果沒有的話,就走普通流程
interrupt0();
}
end方法
begin()方法負責添加Channel的中斷處理器到當前線程。end()是在IO操作執行完/中斷完後的操作,負責判斷中斷是否發生,如果發生判斷是當前線程發生還是別的線程中斷把當前操作的Channel給關閉了,對於不同的情況,拋出不同的異常。
protected final void end(boolean completed) throws AsynchronousCloseException
{
// 清空線程的中斷處理器引用,避免線程一直存活導致中斷處理器無法被回收
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
// 如果這次沒有讀取到數據,並且Channel被另外一個線程關閉了,則排除Channel被非同步關閉的異常
// 但是如果這次讀取到了數據,就不能拋出異常,因為這次讀取的數據是有效的,需要返回給用戶的(重要邏輯)
if (!completed && !open)
throw new AsynchronousCloseException();
}
通過代碼可以看出,如果是當前線程被中斷,則拋出ClosedByInterruptException異常,表示Channel因為線程中斷而被關閉了,IO操作也隨之中斷了。
如果是當前線程發現Channel被關閉了,並且是讀取還未執行完畢的情況,則拋出AsynchronousCloseException異常,表示Channel被非同步關閉了。
end()邏輯的活動圖如下:
場景分析
並發的場景分析起來就是複雜,上面的代碼不多,但是場景很多,我們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)為例分析一下可能的場景:
A線程read,B線程中斷A線程:A線程拋出ClosedByInterruptException異常
A,B線程read,C線程中斷A線程
A被中斷時,B剛剛進入read方法:A線程拋出ClosedByInterruptException異常,B線程ensureOpen方法拋出ClosedChannelException異常
A被中斷時,B阻塞在底層read方法中:A線程拋出ClosedByInterruptException異常,B線程底層方法拋出異常返回,end方法中拋出AsynchronousCloseException異常
A被中斷時,B已經讀取到數據:A線程拋出ClosedByInterruptException異常,B線程正常返回
sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代碼如下:
public int read(ByteBuffer dst) throws IOException {
ensureOpen(); // 1
if (!readable) // 2
throw new NonReadableChannelException();
synchronized (positionLock) {
int n = 0;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return 0; // 3
do {
n = IOUtil.read(fd, dst, -1, nd); // 4
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end(n > 0);
assert IOStatus.check(n);
}
}
}
總結
在JavaIO時期,人們為了中斷IO操作想了不少方法,核心操作就是關閉流,促使IO操作拋出異常,達到中斷IO的效果。NIO中,將這個操作植入了java.lang.Thread#interrupt方法,免去用戶自己編碼特定代碼的麻煩。使IO操作可以像其他可中斷方法一樣,在中斷時拋出ClosedByInterruptException異常,業務程序捕獲該異常即可對IO中斷做出響應。
參考資料
java – What does JavaLangAccess.blockedOn(Thread t, Interruptible b) do? – Stack Overflow
https://stackoverflow.com/questions/8544891/what-does-javalangaccess-blockedonthread-t-interruptible-b-do
Java NIO 那些躲在角落的細節
https://www.oschina.net/question/138146_26027
【關於投稿】
如果大家有原創好文投稿,請直接給公號發送留言。
① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接
② 示例:
【投稿】《不要自稱是程序員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/
③ 最後請附上您的個人簡介哈~
看完本文有收穫?請轉發分享給更多人
關注「ImportNew」,提升Java技能
TAG:ImportNew |