當前位置:
首頁 > 知識 > JDK源碼閱讀:InterruptibleChannel 與可中斷 IO

JDK源碼閱讀:InterruptibleChannel 與可中斷 IO

(點擊

上方公眾號

,可快速關注)




來源:木杉的博客 ,


imushan.com/2018/08/01/java/language/JDK源碼閱讀-InterruptibleChannel與可中斷IO/




Java傳統IO是不支持中斷的,所以如果代碼在read/write等操作阻塞的話,是無法被中斷的。這就無法和Thead的interrupt模型配合使用了。JavaNIO眾多的升級點中就包含了IO操作對中斷的支持。InterruptiableChannel表示支持中斷的Channel。我們常用的FileChannel,SocketChannel,DatagramChannel都實現了這個介面。




InterruptibleChannel介面





public interface InterruptibleChannel extends Channel


{


 


    /**


     * 關閉當前Channel


     *     


     * 任何當前阻塞在當前channel執行的IO操作上的線程,都會收到一個AsynchronousCloseException異常


     */


    public void close() throws IOException;


}




InterruptibleChannel介面沒有定義任何方法,其中的close方法是父介面就有的,這裡只是添加了額外的注釋。




AbstractInterruptibleChannel實現了InterruptibleChannel介面,並提供了實現可中斷IO機制的重要的方法,比如begin(),end()。




在解讀這些方法的代碼前,先了解一下NIO中,支持中斷的Channel代碼是如何編寫的。



第一個要求是要正確使用begin()和end()方法:





boolean completed = false;


try {


    begin();


    completed = ...;    // 執行阻塞IO操作


    return ...;         // 返回結果


} finally {


    end(completed);


}



NIO規定了,在阻塞IO的語句前後,需要調用begin()和end()方法,為了保證end()方法一定被調用,要求放在finally語句塊中。




第二個要求是Channel需要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會調用這個方法,使用Channel的具體實現來關閉Channel。




接下來我們具體看一下begin()和end()方法是如何實現的。




begin方法




// 保存中斷處理對象實例


private Interruptible interruptor;


// 保存被中斷線程實例


private volatile Thread interrupted;


 


protected final void begin() {


    // 初始化中斷處理對象,中斷處理對象提供了中斷處理回調


    // 中斷處理回調登記被中斷的線程,然後調用implCloseChannel方法,關閉Channel


    if (interruptor == null) {


        interruptor = new Interruptible() {


            public void interrupt(Thread target) {


                synchronized (closeLock) {


                    // 如果當前Channel已經關閉,則直接返回


                    if (!open)


                        return;


 


                    // 設置標誌位,同時登記被中斷的線程


                    open = false;


                    interrupted = target;


                    try {


                        // 調用具體的Channel實現關閉Channel


                        AbstractInterruptibleChannel.this.implCloseChannel();


                    } catch (IOException x) { }


                }


            }};


    }


    // 登記中斷處理對象到當前線程


    blockedOn(interruptor);


 


    // 判斷當前線程是否已經被中斷,如果已經被中斷,可能登記的中斷處理對象沒有被執行,這裡手動執行一下


    Thread me = Thread.currentThread();


    if (me.isInterrupted())


        interruptor.interrupt(me);


}




從begin()方法中,我們可以看出NIO實現可中斷IO操作的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理對象,這樣Thread對象在被中斷時,會執行中斷處理對象中的回調,這個回調中,執行關閉Channel的操作。這樣就實現了Channel對線程中斷的響應了。




接下來重點就是研究「Thread添加中斷處理邏輯」這個機制是如何實現的了,是通過blockedOn方法實現的:





static void blockedOn(Interruptible intr) {         // package-private


    sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);


}




blockedOn方法使用的是JavaLangAccess的blockedOn方法。




SharedSecrets是一個神奇而糟糕的類,為啥說是糟糕呢,因為這個方法的存在,就是為了訪問JDK類庫中一些因為類作用域限制而外部無法訪問的類或者方法。JDK很多類與方法是私有或者包級別私有的,外部是無法訪問的,但是JDK在本身實現的時候又存在互相依賴的情況,所以為了外部可以不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各種超越限制的方法。




SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess對象。JavaLangAccess對象就和名稱所說的一樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:





// java.lang.System#setJavaLangAccess


private static void setJavaLangAccess() {


    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){


        public void blockedOn(Thread t, Interruptible b) {


            t.blockedOn(b);


        }


        //...


    });


}




可以看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:





/* The object in which this thread is blocked in an interruptible I/O


 * operation, if any.  The blocker"s interrupt method should be invoked


 * after setting this thread"s interrupt status.


 */


private volatile Interruptible blocker;


private final Object blockerLock = new Object();


 


/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code


 */


void blockedOn(Interruptible b) {


    // 串列化blocker相關操作


    synchronized (blockerLock) {


        blocker = b;


    }


}




而這個方法也非常簡單,就是設置java.lang.Thread#blocker變數為之前提到的中斷處理對象。而且從注釋中可以看出,這個方法就是專門為NIO設計的,注釋都非常直白的提到了,NIO的代碼會通過sun.misc.SharedSecrets調用到這個方法。。




接下來就是重頭戲了,看一下Thread在中斷時,如何調用NIO註冊的中斷處理器:





public void interrupt() {


    if (this != Thread.currentThread())


        checkAccess();


 


    synchronized (blockerLock) {


        Interruptible b = blocker;


 


        // 如果NIO設置了中斷處理器,則只需Thread本身的中斷邏輯後,調用中斷處理器的回調函數


        if (b != null) {


            interrupt0();           // 這一步會設置interrupt標誌位


            b.interrupt(this);


            return;


        }


    }


 


    // 如果沒有的話,就走普通流程


    interrupt0();


}







end方法




begin()方法負責添加Channel的中斷處理器到當前線程。end()是在IO操作執行完/中斷完後的操作,負責判斷中斷是否發生,如果發生判斷是當前線程發生還是別的線程中斷把當前操作的Channel給關閉了,對於不同的情況,拋出不同的異常。





protected final void end(boolean completed) throws AsynchronousCloseException


{


    // 清空線程的中斷處理器引用,避免線程一直存活導致中斷處理器無法被回收


    blockedOn(null);


    Thread interrupted = this.interrupted;


 


    if (interrupted != null && interrupted == Thread.currentThread()) {


        interrupted = null;


        throw new ClosedByInterruptException();


    }


    // 如果這次沒有讀取到數據,並且Channel被另外一個線程關閉了,則排除Channel被非同步關閉的異常


    // 但是如果這次讀取到了數據,就不能拋出異常,因為這次讀取的數據是有效的,需要返回給用戶的(重要邏輯)


    if (!completed && !open)


        throw new AsynchronousCloseException();


}




通過代碼可以看出,如果是當前線程被中斷,則拋出ClosedByInterruptException異常,表示Channel因為線程中斷而被關閉了,IO操作也隨之中斷了。




如果是當前線程發現Channel被關閉了,並且是讀取還未執行完畢的情況,則拋出AsynchronousCloseException異常,表示Channel被非同步關閉了。




end()邏輯的活動圖如下:







場景分析




並發的場景分析起來就是複雜,上面的代碼不多,但是場景很多,我們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)為例分析一下可能的場景:






  1. A線程read,B線程中斷A線程:A線程拋出ClosedByInterruptException異常



  2. A,B線程read,C線程中斷A線程




  • A被中斷時,B剛剛進入read方法:A線程拋出ClosedByInterruptException異常,B線程ensureOpen方法拋出ClosedChannelException異常



  • A被中斷時,B阻塞在底層read方法中:A線程拋出ClosedByInterruptException異常,B線程底層方法拋出異常返回,end方法中拋出AsynchronousCloseException異常



  • A被中斷時,B已經讀取到數據:A線程拋出ClosedByInterruptException異常,B線程正常返回




sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代碼如下:





public int read(ByteBuffer dst) throws IOException {


    ensureOpen();  // 1


    if (!readable) // 2


        throw new NonReadableChannelException();


    synchronized (positionLock) {


        int n = 0;


        int ti = -1;


        try {            


            begin();


            ti = threads.add();


            if (!isOpen())


                return 0; // 3


            do {


                n = IOUtil.read(fd, dst, -1, nd); // 4


            } while ((n == IOStatus.INTERRUPTED) && isOpen());


            return IOStatus.normalize(n);


        } finally {


            threads.remove(ti);


            end(n > 0);


            assert IOStatus.check(n);


        }


    }


}




總結




在JavaIO時期,人們為了中斷IO操作想了不少方法,核心操作就是關閉流,促使IO操作拋出異常,達到中斷IO的效果。NIO中,將這個操作植入了java.lang.Thread#interrupt方法,免去用戶自己編碼特定代碼的麻煩。使IO操作可以像其他可中斷方法一樣,在中斷時拋出ClosedByInterruptException異常,業務程序捕獲該異常即可對IO中斷做出響應。




參考資料






  • java – What does JavaLangAccess.blockedOn(Thread t, Interruptible b) do? – Stack Overflow 


    https://stackoverflow.com/questions/8544891/what-does-javalangaccess-blockedonthread-t-interruptible-b-do



  • Java NIO 那些躲在角落的細節


    https://www.oschina.net/question/138146_26027




【關於投稿】




如果大家有原創好文投稿,請直接給公號發送留言。




① 留言格式:


【投稿】+《 文章標題》+ 文章鏈接

② 示例:


【投稿】《不要自稱是程序員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~






看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

MySQL 性能優化 : 索引和查詢優化

TAG:ImportNew |