當前位置:
首頁 > 知識 > FutureTask 在線程池中應用和源碼解析

FutureTask 在線程池中應用和源碼解析

(點擊

上方公眾號

,可快速關注)




來源:hcy0411 ,


www.jianshu.com/p/1fac6476e85f







FutureTask 是一個支持取消的非同步處理器,一般在線程池中用於非同步接受callable返回值。



主要實現分三部分:






  1. 封裝 Callable,然後放到線程池中去非同步執行->run。



  2. 獲取結果-> get。



  3. 取消任務-> cancel。




接下來主要學習下該模型如何實現。




舉例說明FutureTask在線程池中的應用





// 第一步,定義線程池,


ExecutorService executor = new ThreadPoolExecutor(


        minPoolSize,


        maxPollSize,


        keepAliveTime,


        TimeUnit.SECONDS,


        new SynchronousQueue<>());


 


// 第二步,放到線程池中執行,返回FutureTask


FutureTask  task = executor.submit(callable);


 


// 第三步,獲取返回值


T data = task.get();



學習FutureTask實現




類屬性





//以下是FutureTask的各種狀態

private volatile int state; 


private static final int NEW          = 0; 


private static final int COMPLETING   = 1;


private static final int NORMAL       = 2;


private static final int EXCEPTIONAL  = 3;

private static final int CANCELLED    = 4;


private static final int INTERRUPTING = 5;


private static final int INTERRUPTED  = 6;


 


private Callable<V> callable; //執行的任務

private Object outcome; //存儲結果或者異常


private volatile Thread runner;//執行callable的線程


private volatile WaitNode waiters; //調用get方法等待獲取結果的線程棧


 


其中各種狀態存在 最終狀態 status>COMPLETING


1)NEW -> COMPLETING -> NORMAL(有正常結果)


2) NEW -> COMPLETING -> EXCEPTIONAL(結果為異常) 


3) NEW -> CANCELLED(無結果) 


4) NEW -> INTERRUPTING -> INTERRUPTED(無結果)




類方法




從上面舉例說明開始分析。




run()方法




FutureTask 繼承 Runnable,ExecutorService submit 把提交的任務封裝成 FutureTask 然後放到線程池 ThreadPoolExecutor 的 execute 執行。





public void run() {


    //如果不是初始狀態或者cas設置運行線程是當前線程不成功,直接返回


    if (state != NEW ||


        !UNSAFE.compareAndSwapObject(this, runnerOffset,


                                     null, Thread.currentThread()))


        return;


    try {


        Callable<V> c = callable;


        if (c != null && state == NEW) {


            V result;


            boolean ran;


            try {


              // 執行callable任務 這裡對異常進行了catch


                result = c.call();


                ran = true;


            } catch (Throwable ex) {


                result = null;


                ran = false;


                setException(ex); // 封裝異常到outcome


            }


            if (ran)


                set(result);


        }


    } finally {


        runner = null;


        int s = state;


        // 這裡如果是中斷中,設置成最終狀態


        if (s >= INTERRUPTING)


            handlePossibleCancellationInterrupt(s);


    }


}




以上是 run 方法源碼實現很簡單,解析如下:






  1. 如果不是始狀態或者 cas 設置運行線程是當前線程不成功,直接返回,防止多個線程重複執行。



  2. 執行 Callable 的 call(),即提交執行任務(這裡做了catch,會捕獲執行任務的異常封裝到 outcome 中)。



  3. 如果成功執行 set 方法,封裝結果。




set 方法





protected void set(V v) {


    //cas方式設置成completing狀態,防止多個線程同時處理


    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {


        outcome = v; // 封裝結果


        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設置成normal狀態


 


        finishCompletion();


    }


}




解析如下:






  1. cas方式設置成completing狀態,防止多個線程同時處理



  2. 封裝結果到outcome,然後設置到最終狀態normal



  3. 執行finishCompletion方法。




finishCompletion方法





// state > COMPLETING; 不管異常,中斷,還是執行完成,都需要執行該方法來喚醒調用get方法阻塞的線程


private void finishCompletion() {


    // assert state > COMPLETING;


    for (WaitNode q; (q = waiters) != null;) {


        // cas 設置waiters為null,防止多個線程執行。


        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {


            // 循環喚醒所有等待結果的線程


            for (;;) {


                Thread t = q.thread;


                if (t != null) {


                    q.thread = null;


                    //喚醒線程


                    LockSupport.unpark(t);


                }


                WaitNode next = q.next;


                if (next == null)


                    break;


                q.next = null; // unlink to help gc


                q = next;


            }


            break;


        }


    }


   //該方法為空,可以被重寫


    done();


    callable = null;        // to reduce footprint


}




解析如下:




遍歷waiters中的等待節點,並通過 LockSupport 喚醒每一個節點,通知每個線程,該任務執行完成(可能是執行完成,也可能 cancel,異常等)。




以上就是執行的過程,接下來分析獲取結果的過程->get。




get 方法





public V get() throws InterruptedException, ExecutionException {


    int s = state;


    if (s <= COMPLETING)


        s = awaitDone(false, 0L);


    return report(s);


}





public V get(long timeout, TimeUnit unit)


        throws InterruptedException, ExecutionException, TimeoutException {


        if (unit == null)


            throw new NullPointerException();


        int s = state;


        if (s <= COMPLETING &&


            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)


            throw new TimeoutException();


        return report(s);


    }




解析如下:




以上兩個方法,原理一樣,其中一個設置超時時間,支持最多阻塞多長時間。


狀態如果小於 COMPLETING,說明還沒到最終狀態,(不管是否是成功、異常、取消)。


調用 awaitDone 方法阻塞線程,最終調用 report 方法返回結果。




awaitDone 方法





private int awaitDone(boolean timed, long nanos)


        throws InterruptedException {


        final long deadline = timed ? System.nanoTime() + nanos : 0L;


        WaitNode q = null;


        boolean queued = false;


        for (;;) {


            //線程可中斷,如果當前阻塞獲取結果線程執行interrupt()方法,則從隊列中移除該節點,並拋出中斷異常


            if (Thread.interrupted()) {


                removeWaiter(q);


                throw new InterruptedException();


            }


            int s = state;


            // 如果已經是最終狀態,退出返回


            if (s > COMPLETING) {


                if (q != null)


                    q.thread = null;


                return s;


            }


            //這裡做了個優化,competiting到最終狀態時間很短,通過yield比掛起響應更快。


            else if (s == COMPLETING) // cannot time out yet


                Thread.yield();


            // 初始化該阻塞節點


            else if (q == null)


                q = new WaitNode();


            // cas方式寫到阻塞waiters棧中


            else if (!queued)


                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,


                                                     q.next = waiters, q);


            // 這裡做阻塞時間處理。


            else if (timed) {


                nanos = deadline - System.nanoTime();


                if (nanos <= 0L) {


                    removeWaiter(q);


                    return state;


                }


                // 阻塞線程,有超時時間


                LockSupport.parkNanos(this, nanos);


            }


            else


                // 阻塞線程


                LockSupport.park(this);


        }


    }




解析如下:




整體流程已寫到註解中,整體實現是放在一個死循環中,唯一出口,是達到最終狀態。


然後是構建節點元素,並將該節點入棧,同時阻塞當前線程等待運行主任務的線程喚醒該節點。




report 方法





private V report(int s) throws ExecutionException {


    Object x = outcome;


    if (s == NORMAL)


        return (V)x;


    if (s >= CANCELLED)


        throw new CancellationException();


    throw new ExecutionException((Throwable)x);


}




然後是report方法,如果是正常結束,返回結果,如果不是正常結束(取消,中斷)拋出異常。




最後分析下取消流程。




cancel 方法





public boolean cancel(boolean mayInterruptIfRunning) {


    if (!(state == NEW &&


          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,


              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))


        return false;


    try {    // in case call to interrupt throws exception


        if (mayInterruptIfRunning) {


            try {


                Thread t = runner;


                if (t != null)


                    t.interrupt();


            } finally { // final state


                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);


            }


        }


    } finally {


        finishCompletion();


    }


    return true;


}




解析如下:




mayInterruptIfRunning參數是是否允許運行中被中斷取消。






  1. 根據入參是否為true,CAS設置狀態為INTERRUPTING或CANCELLED,設置成功,繼續第二步,否則直接返回false。



  2. 如果允許運行中被中斷取消,調用runner.interupt()進行中斷取消,設置狀態為INTERRUPTED。



  3. 喚醒所有在get()方法等待的線程。




此處有兩種狀態轉換:






  1. 如果mayInterruptIfRunning為true:status狀態轉換為 new -> INTERRUPTING->INTERRUPTED。主動去中斷執行線程,然後喚醒所有等待結果的線程。



  2. 如果mayInterruptIfRunning為false:status狀態轉換為 new -> CANCELLED。




不會去中斷執行線程,直接喚醒所有等待結果的線程,從 awaitDone 方法中可以看到,喚醒等待線程後,直接從跳轉回 get 方法,然後把結果返回給獲取結果的線程,當然此時的結果是 null。




總結




以上就是 FutureTask 的源碼簡單解析,實現比較簡單,FutureTask 就是一個實現 Future 模式,支持取消的非同步處理器。




看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

Map 大家族的那點事兒 ( 3 ) :TreeMap
MySQL 問題分析 : ERROR 1071 (42000) : Specified key was too long

TAG:ImportNew |