FutureTask 在線程池中應用和源碼解析
(點擊
上方公眾號
,可快速關注)
來源:hcy0411 ,
www.jianshu.com/p/1fac6476e85f
FutureTask 是一個支持取消的非同步處理器,一般在線程池中用於非同步接受callable返回值。
主要實現分三部分:
封裝 Callable,然後放到線程池中去非同步執行->run。
獲取結果-> get。
取消任務-> cancel。
接下來主要學習下該模型如何實現。
舉例說明FutureTask在線程池中的應用
// 第一步,定義線程池,
ExecutorService executor = new ThreadPoolExecutor(
minPoolSize,
maxPollSize,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<>());
// 第二步,放到線程池中執行,返回FutureTask
FutureTask task = executor.submit(callable);
// 第三步,獲取返回值
T data = task.get();
學習FutureTask實現
類屬性
//以下是FutureTask的各種狀態
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; //執行的任務
private Object outcome; //存儲結果或者異常
private volatile Thread runner;//執行callable的線程
private volatile WaitNode waiters; //調用get方法等待獲取結果的線程棧
其中各種狀態存在 最終狀態 status>COMPLETING
1)NEW -> COMPLETING -> NORMAL(有正常結果)
2) NEW -> COMPLETING -> EXCEPTIONAL(結果為異常)
3) NEW -> CANCELLED(無結果)
4) NEW -> INTERRUPTING -> INTERRUPTED(無結果)
類方法
從上面舉例說明開始分析。
run()方法
FutureTask 繼承 Runnable,ExecutorService submit 把提交的任務封裝成 FutureTask 然後放到線程池 ThreadPoolExecutor 的 execute 執行。
public void run() {
//如果不是初始狀態或者cas設置運行線程是當前線程不成功,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執行callable任務 這裡對異常進行了catch
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 封裝異常到outcome
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
// 這裡如果是中斷中,設置成最終狀態
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以上是 run 方法源碼實現很簡單,解析如下:
如果不是始狀態或者 cas 設置運行線程是當前線程不成功,直接返回,防止多個線程重複執行。
執行 Callable 的 call(),即提交執行任務(這裡做了catch,會捕獲執行任務的異常封裝到 outcome 中)。
如果成功執行 set 方法,封裝結果。
set 方法
protected void set(V v) {
//cas方式設置成completing狀態,防止多個線程同時處理
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 封裝結果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設置成normal狀態
finishCompletion();
}
}
解析如下:
cas方式設置成completing狀態,防止多個線程同時處理
封裝結果到outcome,然後設置到最終狀態normal
執行finishCompletion方法。
finishCompletion方法
// state > COMPLETING; 不管異常,中斷,還是執行完成,都需要執行該方法來喚醒調用get方法阻塞的線程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// cas 設置waiters為null,防止多個線程執行。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 循環喚醒所有等待結果的線程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//該方法為空,可以被重寫
done();
callable = null; // to reduce footprint
}
解析如下:
遍歷waiters中的等待節點,並通過 LockSupport 喚醒每一個節點,通知每個線程,該任務執行完成(可能是執行完成,也可能 cancel,異常等)。
以上就是執行的過程,接下來分析獲取結果的過程->get。
get 方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
解析如下:
以上兩個方法,原理一樣,其中一個設置超時時間,支持最多阻塞多長時間。
狀態如果小於 COMPLETING,說明還沒到最終狀態,(不管是否是成功、異常、取消)。
調用 awaitDone 方法阻塞線程,最終調用 report 方法返回結果。
awaitDone 方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//線程可中斷,如果當前阻塞獲取結果線程執行interrupt()方法,則從隊列中移除該節點,並拋出中斷異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果已經是最終狀態,退出返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//這裡做了個優化,competiting到最終狀態時間很短,通過yield比掛起響應更快。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 初始化該阻塞節點
else if (q == null)
q = new WaitNode();
// cas方式寫到阻塞waiters棧中
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 這裡做阻塞時間處理。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞線程,有超時時間
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞線程
LockSupport.park(this);
}
}
解析如下:
整體流程已寫到註解中,整體實現是放在一個死循環中,唯一出口,是達到最終狀態。
然後是構建節點元素,並將該節點入棧,同時阻塞當前線程等待運行主任務的線程喚醒該節點。
report 方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
然後是report方法,如果是正常結束,返回結果,如果不是正常結束(取消,中斷)拋出異常。
最後分析下取消流程。
cancel 方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
解析如下:
mayInterruptIfRunning參數是是否允許運行中被中斷取消。
根據入參是否為true,CAS設置狀態為INTERRUPTING或CANCELLED,設置成功,繼續第二步,否則直接返回false。
如果允許運行中被中斷取消,調用runner.interupt()進行中斷取消,設置狀態為INTERRUPTED。
喚醒所有在get()方法等待的線程。
此處有兩種狀態轉換:
如果mayInterruptIfRunning為true:status狀態轉換為 new -> INTERRUPTING->INTERRUPTED。主動去中斷執行線程,然後喚醒所有等待結果的線程。
如果mayInterruptIfRunning為false:status狀態轉換為 new -> CANCELLED。
不會去中斷執行線程,直接喚醒所有等待結果的線程,從 awaitDone 方法中可以看到,喚醒等待線程後,直接從跳轉回 get 方法,然後把結果返回給獲取結果的線程,當然此時的結果是 null。
總結
以上就是 FutureTask 的源碼簡單解析,實現比較簡單,FutureTask 就是一個實現 Future 模式,支持取消的非同步處理器。
看完本文有收穫?請轉發分享給更多人
關注「ImportNew」,提升Java技能
※Map 大家族的那點事兒 ( 3 ) :TreeMap
※MySQL 問題分析 : ERROR 1071 (42000) : Specified key was too long
TAG:ImportNew |