Loading...
墨滴

Stefan777

2021/04/14  阅读:29  主题:姹紫

FutureTask源码解读——阻塞获取异步计算结果(阻塞、取消、装饰器、适配器、Callable)

天青色等烟雨,而我在等你,微信公众号搜索:徐同学呀,持续更新肝货,快来关注我,和我一起学习吧~

一、前言

FutureTask继承自Runnable,所以也可以实现异步执行的效果,但是和常规的异步执行方式不同,常规异步只要求异步的过程是正确的就可以了,而FutureTask不仅可以知道异步执行的状态,还可以知道异步结果。那它是如何实现的呢?

FutureTask在JUC中是一个比较重要的类:

  • 它是ScheduledThreadPoolExecutor内部类ScheduledFutureTask的父类,ScheduledThreadPoolExecutor实现延迟和周期性调度功能时调用的就是FutureTask的函数。
  • 再者线程池的抽象父类AbstractExecutorService中有一个函数submit(),也是可以提交任务异步执行,其内部通过将任务类包装成FutureTask提交给工作线程异步执行,更是被“面试圣经”总结为是第三种实现线程的方式。这种说法是肤浅的,感知肤浅正是探索底层的开始。
//AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) {
    if (task == nullthrow new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

二、原理简述

FutureTask作为Runnable的子类,它就像是一个装饰器在Runnable异步执行的功能上,又增加了可以获取异步执行状态以及结果的功能:

  • 其内部维护了一个Callable类型的成员变量,任务代码会包装成callableFutureTask直接调用Callable.call()执行任务代码并返回结果。
  • 还维护了一个链表实现的栈,外部获取结果的线程在任务没有执行完前都会被压入栈并阻塞(awaitDone),任务完成唤醒所有阻塞线程(finishCompletion)。

三、基本结构

FutureTask实现了接口RunnableFutureRunnableFuture继承了接口RunnableFuturefutureTask

1、基本定义

FutureTask有7个状态:NEW(新建)、COMPLETING(正在完成)、NORMAL(正常完成)、EXCEPTIONAL(异常完成)、CANCELLED(被取消)、INTERRUPTING(正在中断)、INTERRUPTED(被中断)。如下是状态流转:

  • NEW -> COMPLETING :此时get会被阻塞,并将当前线程放入阻塞栈中。
  • NEW -> COMPLETING -> NORMAL :此时outcomecallable的运行结果。
  • NEW -> COMPLETING -> EXCEPTIONAL :此时outcomecallable的运行异常。
  • NEW -> CANCELLED :调用了cancel(false)取消任务,删除并唤醒所有等待的线程。
  • NEW -> INTERRUPTING -> INTERRUPTED :调用了cancel(true)中断任务,删除并唤醒所有等待的线程。

FutureTask的两大特点阻塞和取消:

  • 阻塞是由waiters属性实现,它是一个由链表实现的栈,先进后出。
  • 取消是由runner属性实现,runnerCAS的方式记录当前运行线程,运行完成会再次设置为null,运行过程中取消,将调用runner.interrupt()中断运行线程,在线程池中取消是中断工作线程。
public class FutureTask<Vimplements RunnableFuture<V{
    
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    //底层调用的任务
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    //记录当前计算线程,用于取消时,中断计算。
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    //等待栈
    private volatile WaitNode waiters;
}

2、构造函数

构造函数有两种,一种参数类型是Callable,可直接赋值给成员变量callable;另一种参数类型是Runnable,并可传入一个result,不过没什么用。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    //设置进来的runnable 会被适配成callable(RunnableAdapter)
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Runnable会被包装成一个实现了CallableRunnableAdapter赋值给callable。适配的过程也可以看出result是怎么传进去就会怎么返回。

//Executors
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    //将Runnable适配成Callable
    return new RunnableAdapter<T>(task, result);
}
//适配器、典型的适配器模式
static final class RunnableAdapter<Timplements Callable<T{
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            //传入的result并没有做任何处理
            return result;
        }
}

所以官方给的函数AbstractExecutorService#submit(java.lang.Runnable, T),也可以传入Runnable类型的任务和result,不过只能判断任务是否完成或者取消任务,外部getresult还是传进去的result,没有太大的意义。

3、核心函数

(1)get阻塞获取结果

get()有两种,一种是会等待计算结束返回,一种是加了超时时长timeout,get线程等待timeout时长,如果此时还没有完成就抛出异常TimeoutException

/**
 * 会一直阻塞到计算结束,可以被打断
 * @throws CancellationException {@inheritDoc}
 */

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //运行未完成,放入等待栈中
        s = awaitDone(false0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException 
{
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        //阻塞时间到,若还没有完成就抛异常
        throw new TimeoutException();
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //如果是正常结束,返回结果
        return (V)x;
    //取消或者中断的就抛出取消异常
    if (s >= CANCELLED)
        throw new CancellationException();
    //其他状态的抛异常EXCEPTIONAL
    throw new ExecutionException((Throwable)x);
}
awaitDone阻塞等待

阻塞的核心代码就是awaitDone,get时若正在计算,将会被放入等待栈阻塞,直到超时时间到,或者被唤醒,或者被中断,然后返回当前的状态。

  1. 当前get线程被打断,删除等待节点,并抛出InterruptedException
  2. 若此时s > COMPLETING有可能完成、取消、中断,将等待节点(WaitNode)的thread设置为null,并返回当前状态。
  3. s == COMPLETING说明正在完成,暂停当前get线程,让出对cpu的占用,执行其他get线程。
  4. 若当前get线程还没有入等待栈,实例化一个WaitNode,并cas入栈。
  5. 最后阻塞,阻塞分为时间阻塞和永久阻塞,阻塞时间到,删除等待节点并返回当前状态。
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException 
{
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //循环阻塞,循环终止- 阻塞时间到,或者被唤醒,然后返回当前的状态
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            //完成,取消,中断
            if (q != null)
                q.thread = null;
            //返回结果状态
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            //正在完成,暂停当前线程,执行其他get线程
            Thread.yield();
        else if (q == null)
            //NEW 状态,新建一个WaitNode
            q = new WaitNode();
        else if (!queued)
            //cas入栈
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            //需要时间阻塞
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //不需要等待,删除等待节点
                removeWaiter(q);
                return state;
            }
            //阻塞一段时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            //timed false  会一直阻塞到 计算完成, 需要唤醒
            LockSupport.park(this);
    }
}

/**
 * 栈,node.thread=null的节点会被删除
 * @param node
 */

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {
            // restart on removeWaiter race
            //循环删除node.thread = null的节点
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null// check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

(2)cancel取消任务中断工作线程

调用cancel()可能是被中断(mayInterruptIfRunning=true)或者是主动取消(mayInterruptIfRunning=false)。

  • 中断取消,设置stateINTERRUPTING,并调用runner.interrupt()中断当前运行线程,设置stateINTERRUPTED,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。
  • 主动取消,设置stateCANCELLED,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。

从代码可以看出只有stateNEW才能被取消。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
            //cass设置 state为INTERRUPTING or CANCELLED
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //状态没有修改成功返回false,取消失败
        return false;
    try {    // in case call to interrupt throws exception
        //如果是被打断的  就调用t.interrupt();中断线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    //中断当前线程
                    t.interrupt();
            } finally { // final state
                //设置state为中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //最后  删除并唤醒所有等待的线程
        finishCompletion();
    }
    return true;
}
//删除并唤醒所有等待的线程
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //循环cas WaitNode 为null,删除所有等待的线程
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //循环唤醒唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    //最后了,break
                    break;
                q.next = null// unlink to help gc 设置为null  有利于gc
                q = next;
            }
            break;
        }
    }
    //钩子函数
    done();
    callable = null;        // to reduce footprint
}

(3)run执行任务代码并唤醒所有等待线程

FutureTask是会被传给ThreadPoolExecutor.Worker,由线程池启动工作线程,然后调用FutureTaskrun()函数,run()又调用callable.call()run()会将结果设置给outcome

public void run() {
    //新任务-->执行UNSAFE.compareAndSwapObject(this, runnerOffset,
    //                                         null, Thread.currentThread()))
    //新任务会将当前线程设置给runner 这里的作用是乐观锁,保证下面的执行流程是只有一个线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //调用callable的call,并设置返回值
                //如果传进来的任务是Runnable,会被转换成callable
                result = c.call();
                //若运行异常,ran=false,异常会被捕获处理
                //所以传进来的任务的run或者call代码块最好try-catch下
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //设置异常给outcome
                setException(ex);
            }
            if (ran)
                //运行正常完成,设置结果给outcome
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //最后 runner=null 相当于是释放锁
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            //如果状态是被打断,让出cpu
            handlePossibleCancellationInterrupt(s);
    }
}

执行细节如下:

  • 新任务CAS设置当前线程给runner,这里的作用是乐观锁保证下面的执行流程只有一个线程,并且外部可以通过runner随时中断执行。
  • 直接调用callable.call()执行任务代码。
  • 中途出现异常,将异常设置给outcome,状态流转为异常完成,并唤醒所有等待的线程。
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //设置异常
        outcome = t;
        //设置异常状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        //唤醒所有线程
        finishCompletion();
    }
}
  • 正常运行完毕将运行结果result设置给outcome,状态流转为正常完成,并唤醒所有等待的线程。
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        //设置为完成状态
        outcome = v;
        //设置正常状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //完成操作-删除并唤醒所有等待的线程
        finishCompletion();
    }
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //循环cas WaitNode 为null,删除所有等待的线程
        //这里cas删除 其实是为了if里面的操作线程安全无锁
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //循环唤醒唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    //最后了,break
                    break;
                q.next = null// unlink to help gc 设置为null  有利于gc
                q = next;
            }
            break;
        }
    }
    //钩子函数
    done();

    callable = null;        // to reduce footprint
}
  • 最终runner设置为null,相当于释放锁;如若此时状态为被打断状态INTERRUPTING,需要让出cpu
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            //暂停当前线程,让出cpu时间片
            Thread.yield(); // wait out pending interrupt
}

(4)runAndReset重复执行

runAndReset()run()类似,但是没有把result设置给outcome,函数返回为boolean,用于是否重复执行。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查
                //不会再往下执行ran=false
                //所以传进来的任务run里需要自己try-catch
                ran = true;
            } catch (Throwable ex) {
                //设置异常给outcome
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            //如果状态是被打断,让出cpu
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

(5)无锁化

FutureTask的几个成员变量并没有使用悲观锁Lock或者synchronized,而是用了cas乐观锁,而且也用了volatile修饰,使得state的流转,runner的设置,waiters的入栈出栈线程安全。

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        //这不就是乐观锁吗 无锁化-牛逼
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

四、实际应用

FutureTask可以配合线程池使用,也可以单独启动线程。

(1)配合线程池使用。

LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(55,
        30, TimeUnit.SECONDS, workQueue);
Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        int i = 100;
        int j = 100;
        int sum = i + j;
        Thread.sleep(2000);
        return sum;
    }
});
long s = System.currentTimeMillis();
//获得计算结果
Integer result = future.get();
long e = System.currentTimeMillis();
System.out.println("result=" + result + ",ms=" + (e-s));
//result=200,ms=2016

(2)单独启动线程

List<FutureTask> taskList = new ArrayList<FutureTask>();
for (int i = 0; i < 3; i++) {
    int j = i;
    FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return j + 10;
        }
    });
    Thread thread = new Thread(futureTask);
    thread.start();
    taskList.add(futureTask);
}
//批量获取结果
for (FutureTask futureTask : taskList) {
    System.out.println(futureTask.get());
}

需要注意:

无论是线程池submit提交任务还是批量启动多个线程使用FutureTask,切不可在一个for循环里一边异步执行一边获取结果,这样会使得整个过程因为阻塞获取结果变成单线程。

应该批量提交任务,批量获取结果

五、总结

  1. FutureTask的特点异步执行、阻塞获取结果、可取消。
  2. FutureTask运用了装饰器和适配器模式。装饰器的体现是在Runnable异步执行的基础上增加了异步阻塞获取结果和取消的功能;适配器的体现是当传入的是任务是Runnable类型时会被适配成一个实现了CallableRunnableAdapter
  3. callableFutureTask的成员变量,无法单独实现线程,配合FutureTask使用最佳。
  4. 任务正常完成outcome的值是callable.run的运行结果;任务异常完成outcome是异常。
  5. 成员变量runner的作用,可在外围中断取消正在执行的任务,也有保证任务执行线程安全。
  6. 成员变量waitersget获取结果当未完成前一个个入栈阻塞,完成时全部出栈唤醒。
  7. 因为runrunAndReset对异常不作为,建议任务代码自行try-catch

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步! 徐同学呀

Stefan777

2021/04/14  阅读:29  主题:姹紫

作者介绍

Stefan777