Loading...
墨滴

冰河

2021/04/18  阅读:79  主题:嫩青

刚研究完Callable和Future,各位随便问!!

大家好,我是冰河~~

在Java的多线程编程中,除了Thread类和Runnable接口外,不得不说的就是Callable接口Future接口了。使用继承Thread类或者实现Runnable接口的线程,无法返回最终的执行结果数据,只能等待线程执行完成。此时,如果想要获取线程执行后的返回结果,那么,Callable和Future就派上用场了。

Callable接口

1.Callable接口介绍

Callable接口是JDK1.5新增的泛型接口,在JDK1.8中,被声明为函数式接口,如下所示。

@FunctionalInterface
public interface Callable<V{
    call() throws Exception;
}

在JDK 1.8中只声明有一个方法的接口为函数式接口,函数式接口可以使用@FunctionalInterface注解修饰,也可以不使用@FunctionalInterface注解修饰。只要一个接口中只包含有一个方法,那么,这个接口就是函数式接口。

在JDK中,实现Callable接口的子类如下图所示。

默认的子类层级关系图看不清,这里,可以通过IDEA右键Callable接口,选择“Layout”来指定Callable接口的实现类图的不同结构,如下所示。

这里,可以选择“Organic Layout”选项,选择后的Callable接口的子类的结构如下图所示。

在实现Callable接口的子类中,有几个比较重要的类,如下图所示。

分别是:Executors类中的静态内部类:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task类下的TaskCallable。

2.实现Callable接口的重要类分析

接下来,分析的类主要有:PrivilegedCallable、PrivilegedCallableUsingCurrentClassLoader、RunnableAdapter和Task类下的TaskCallable。虽然这些类在实际工作中很少被直接用到,但是作为一名合格的开发工程师,设置是秃顶的资深专家来说,了解并掌握这些类的实现有助你进一步理解Callable接口,并提高专业技能(头发再掉一批,哇哈哈哈。。。)。

  • PrivilegedCallable

PrivilegedCallable类是Callable接口的一个特殊实现类,它表明Callable对象有某种特权来访问系统的某种资源,PrivilegedCallable类的源代码如下所示。

/**
 * A callable that runs under established access control settings
 */

static final class PrivilegedCallable<Timplements Callable<T{
 private final Callable<T> task;
 private final AccessControlContext acc;

 PrivilegedCallable(Callable<T> task) {
  this.task = task;
  this.acc = AccessController.getContext();
 }

 public T call() throws Exception {
  try {
   return AccessController.doPrivileged(
    new PrivilegedExceptionAction<T>() {
     public T run() throws Exception {
      return task.call();
     }
    }, acc);
  } catch (PrivilegedActionException e) {
   throw e.getException();
  }
 }
}

从PrivilegedCallable类的源代码来看,可以将PrivilegedCallable看成是对Callable接口的封装,并且这个类也继承了Callable接口。

在PrivilegedCallable类中有两个成员变量,分别是Callable接口的实例对象和AccessControlContext类的实例对象,如下所示。

private final Callable<T> task;
private final AccessControlContext acc;

其中,AccessControlContext类可以理解为一个具有系统资源访问决策的上下文类,通过这个类可以访问系统的特定资源。通过类的构造方法可以看出,在实例化AccessControlContext类的对象时,只需要传递Callable接口子类的对象即可,如下所示。

PrivilegedCallable(Callable<T> task) {
 this.task = task;
 this.acc = AccessController.getContext();
}

AccessControlContext类的对象是通过AccessController类的getContext()方法获取的,这里,查看AccessController类的getContext()方法,如下所示。

public static AccessControlContext getContext(){
 AccessControlContext acc = getStackAccessControlContext();
 if (acc == null) {
  return new AccessControlContext(nulltrue);
 } else {
  return acc.optimize();
 }
}

通过AccessController的getContext()方法可以看出,首先通过getStackAccessControlContext()方法来获取AccessControlContext对象实例。如果获取的AccessControlContext对象实例为空,则通过调用AccessControlContext类的构造方法实例化,否则,调用AccessControlContext对象实例的optimize()方法返回AccessControlContext对象实例。

这里,我们先看下getStackAccessControlContext()方法是个什么鬼。

private static native AccessControlContext getStackAccessControlContext();

原来是个本地方法,方法的字面意思就是获取能够访问系统栈的决策上下文对象。

接下来,我们回到PrivilegedCallable类的call()方法,如下所示。

public T call() throws Exception {
 try {
  return AccessController.doPrivileged(
   new PrivilegedExceptionAction<T>() {
    public T run() throws Exception {
     return task.call();
    }
   }, acc);
 } catch (PrivilegedActionException e) {
  throw e.getException();
 }
}

通过调用AccessController.doPrivileged()方法,传递PrivilegedExceptionAction。接口对象和AccessControlContext对象,并最终返回泛型的实例对象。

首先,看下AccessController.doPrivileged()方法,如下所示。

@CallerSensitive
public static native <T> T
    doPrivileged(PrivilegedExceptionAction<T> action,
                 AccessControlContext context)

    throws PrivilegedActionException
;

可以看到,又是一个本地方法。也就是说,最终的执行情况是将PrivilegedExceptionAction接口对象和AccessControlContext对象实例传递给这个本地方法执行。并且在PrivilegedExceptionAction接口对象的run()方法中调用Callable接口的call()方法来执行最终的业务逻辑,并且返回泛型对象。

  • PrivilegedCallableUsingCurrentClassLoader

此类表示为在已经建立的特定访问控制和当前的类加载器下运行的Callable类,源代码如下所示。

/**
 * A callable that runs under established access control settings and
 * current ClassLoader
 */

static final class PrivilegedCallableUsingCurrentClassLoader<Timplements Callable<T{
 private final Callable<T> task;
 private final AccessControlContext acc;
 private final ClassLoader ccl;

 PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
  SecurityManager sm = System.getSecurityManager();
  if (sm != null) {
   sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
   sm.checkPermission(new RuntimePermission("setContextClassLoader"));
  }
  this.task = task;
  this.acc = AccessController.getContext();
  this.ccl = Thread.currentThread().getContextClassLoader();
 }

 public T call() throws Exception {
  try {
   return AccessController.doPrivileged(
    new PrivilegedExceptionAction<T>() {
     public T run() throws Exception {
      Thread t = Thread.currentThread();
      ClassLoader cl = t.getContextClassLoader();
      if (ccl == cl) {
       return task.call();
      } else {
       t.setContextClassLoader(ccl);
       try {
        return task.call();
       } finally {
        t.setContextClassLoader(cl);
       }
      }
     }
    }, acc);
  } catch (PrivilegedActionException e) {
   throw e.getException();
  }
 }
}

这个类理解起来比较简单,首先,在类中定义了三个成员变量,如下所示。

private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;

接下来,通过构造方法注入Callable对象,在构造方法中,首先获取系统安全管理器对象实例,通过系统安全管理器对象实例检查是否具有获取ClassLoader和设置ContextClassLoader的权限。并在构造方法中为三个成员变量赋值,如下所示。

PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
 SecurityManager sm = System.getSecurityManager();
 if (sm != null) {
  sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
  sm.checkPermission(new RuntimePermission("setContextClassLoader"));
 }
 this.task = task;
 this.acc = AccessController.getContext();
 this.ccl = Thread.currentThread().getContextClassLoader();
}

接下来,通过调用call()方法来执行具体的业务逻辑,如下所示。

public T call() throws Exception {
 try {
  return AccessController.doPrivileged(
   new PrivilegedExceptionAction<T>() {
    public T run() throws Exception {
     Thread t = Thread.currentThread();
     ClassLoader cl = t.getContextClassLoader();
     if (ccl == cl) {
      return task.call();
     } else {
      t.setContextClassLoader(ccl);
      try {
       return task.call();
      } finally {
       t.setContextClassLoader(cl);
      }
     }
    }
   }, acc);
 } catch (PrivilegedActionException e) {
  throw e.getException();
 }
}

在call()方法中同样是通过调用AccessController类的本地方法doPrivileged,传递PrivilegedExceptionAction接口的实例对象和AccessControlContext类的对象实例。

具体执行逻辑为:在PrivilegedExceptionAction对象的run()方法中获取当前线程的ContextClassLoader对象,如果在构造方法中获取的ClassLoader对象与此处的ContextClassLoader对象是同一个对象(不止对象实例相同,而且内存地址也相同),则直接调用Callable对象的call()方法返回结果。否则,将PrivilegedExceptionAction对象的run()方法中的当前线程的ContextClassLoader设置为在构造方法中获取的类加载器对象,接下来,再调用Callable对象的call()方法返回结果。最终将当前线程的ContextClassLoader重置为之前的ContextClassLoader。

  • RunnableAdapter

RunnableAdapter类比较简单,给定运行的任务和结果,运行给定的任务并返回给定的结果,源代码如下所示。

/**
 * A callable that runs given task and returns given result
 */

static final class RunnableAdapter<Timplements Callable<T{
 final Runnable task;
 final T result;
 RunnableAdapter(Runnable task, T result) {
  this.task = task;
  this.result = result;
 }
 public T call() {
  task.run();
  return result;
 }
}
  • TaskCallable

TaskCallable类是javafx.concurrent.Task类的静态内部类,TaskCallable类主要是实现了Callable接口并且被定义为FutureTask的类,并且在这个类中允许我们拦截call()方法来更新task任务的状态。源代码如下所示。

private static final class TaskCallable<Vimplements Callable<V{

 private Task<V> task;
 private TaskCallable() { }

 @Override 
 public V call() throws Exception {
  task.started = true;
  task.runLater(() -> {
   task.setState(State.SCHEDULED);
   task.setState(State.RUNNING);
  });
  try {
   final V result = task.call();
   if (!task.isCancelled()) {
    task.runLater(() -> {
     task.updateValue(result);
     task.setState(State.SUCCEEDED);
    });
    return result;
   } else {
    return null;
   }
  } catch (final Throwable th) {
   task.runLater(() -> {
    task._setException(th);
    task.setState(State.FAILED);
   });
   if (th instanceof Exception) {
    throw (Exception) th;
   } else {
    throw new Exception(th);
   }
  }
 }
}

从TaskCallable类的源代码可以看出,只定义了一个Task类型的成员变量。下面主要分析TaskCallable类的call()方法。

当程序的执行进入到call()方法时,首先将task对象的started属性设置为true,表示任务已经开始,并且将任务的状态依次设置为State.SCHEDULED和State.RUNNING,依次触发任务的调度事件和运行事件。如下所示。

task.started = true;
task.runLater(() -> {
 task.setState(State.SCHEDULED);
 task.setState(State.RUNNING);
});

接下来,在try代码块中执行Task对象的call()方法,返回泛型对象。如果任务没有被取消,则更新任务的缓存,将调用call()方法返回的泛型对象绑定到Task对象中的ObjectProperty 对象中,其中,ObjectProperty 在Task类中的定义如下。

private final ObjectProperty<V> value = new SimpleObjectProperty<>(this"value");

接下来,将任务的状态设置为成功状态。如下所示。

try {
 final V result = task.call();
 if (!task.isCancelled()) {
  task.runLater(() -> {
   task.updateValue(result);
   task.setState(State.SUCCEEDED);
  });
  return result;
 } else {
  return null;
 }
}

如果程序抛出了异常或者错误,会进入catch()代码块,设置Task对象的Exception信息并将状态设置为State.FAILED,也就是将任务标记为失败。接下来,判断异常或错误的类型,如果是Exception类型的异常,则直接强转为Exception类型的异常并抛出。否则,将异常或者错误封装为Exception对象并抛出,如下所示。

catch (final Throwable th) {
 task.runLater(() -> {
  task._setException(th);
  task.setState(State.FAILED);
 });
 if (th instanceof Exception) {
  throw (Exception) th;
 } else {
  throw new Exception(th);
 }
}

两种异步模型与深度解析Future接口

两种异步模型

在Java的并发编程中,大体上会分为两种异步编程模型,一类是直接以异步的形式来并行运行其他的任务,不需要返回任务的结果数据。一类是以异步的形式运行其他任务,需要返回结果。

1.无返回结果的异步模型

无返回结果的异步任务,可以直接将任务丢进线程或线程池中运行,此时,无法直接获得任务的执行结果数据,一种方式是可以使用回调方法来获取任务的运行结果。

具体的方案是:定义一个回调接口,并在接口中定义接收任务结果数据的方法,具体逻辑在回调接口的实现类中完成。将回调接口与任务参数一同放进线程或线程池中运行,任务运行后调用接口方法,执行回调接口实现类中的逻辑来处理结果数据。这里,给出一个简单的示例供参考。

  • 定义回调接口
package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 定义回调接口
 */

public interface TaskCallable<T{
    callable(T t);
}

便于接口的通用型,这里为回调接口定义了泛型。

  • 定义任务结果数据的封装类
package io.binghe.concurrent.lab04;

import java.io.Serializable;

/**
 * @author binghe
 * @version 1.0.0
 * @description 任务执行结果
 */

public class TaskResult implements Serializable {
    private static final long serialVersionUID = 8678277072402730062L;
    /**
     * 任务状态
     */

    private Integer taskStatus;

    /**
     * 任务消息
     */

    private String taskMessage;

    /**
     * 任务结果数据
     */

    private String taskResult;
 
 //省略getter和setter方法
 @Override
    public String toString() {
        return "TaskResult{" +
                "taskStatus=" + taskStatus +
                ", taskMessage='" + taskMessage + '\'' +
                ", taskResult='" + taskResult + '\'' +
                '}';
    }
}
  • 创建回调接口的实现类

回调接口的实现类主要用来对任务的返回结果进行相应的业务处理,这里,为了方便演示,只是将结果数据返回。大家需要根据具体的业务场景来做相应的分析和处理。

package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 回调函数的实现类
 */

public class TaskHandler implements TaskCallable<TaskResult{
    @Override
public TaskResult callable(TaskResult taskResult) {
//TODO 拿到结果数据后进一步处理
    System.out.println(taskResult.toString());
        return taskResult;
    }
}
  • 创建任务的执行类

任务的执行类是具体执行任务的类,实现Runnable接口,在此类中定义一个回调接口类型的成员变量和一个String类型的任务参数(模拟任务的参数),并在构造方法中注入回调接口和任务参数。在run方法中执行任务,任务完成后将任务的结果数据封装成TaskResult对象,调用回调接口的方法将TaskResult对象传递到回调方法中。

package io.binghe.concurrent.lab04;
/**
 * @author binghe
 * @version 1.0.0
 * @description 任务执行类
 */

public class TaskExecutor implements Runnable{
    private TaskCallable<TaskResult> taskCallable;
    private String taskParameter;

    public TaskExecutor(TaskCallable<TaskResult> taskCallable, String taskParameter){
        this.taskCallable = taskCallable;
        this.taskParameter = taskParameter;
    }

    @Override
    public void run() {
        //TODO 一系列业务逻辑,将结果数据封装成TaskResult对象并返回
        TaskResult result = new TaskResult();
        result.setTaskStatus(1);
        result.setTaskMessage(this.taskParameter);
        result.setTaskResult("异步回调成功");
        taskCallable.callable(result);
    }
}

到这里,整个大的框架算是完成了,接下来,就是测试看能否获取到异步任务的结果了。

  • 异步任务测试类
package io.binghe.concurrent.lab04;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试回调
 */

public class TaskCallableTest {
    public static void main(String[] args){
        TaskCallable<TaskResult> taskCallable = new TaskHandler();
        TaskExecutor taskExecutor = new TaskExecutor(taskCallable, "测试回调任务");
        new Thread(taskExecutor).start();
    }
}

在测试类中,使用Thread类创建一个新的线程,并启动线程运行任务。运行程序最终的接口数据如下所示。

TaskResult{taskStatus=1, taskMessage='测试回调任务', taskResult='异步回调成功'}

大家可以细细品味下这种获取异步结果的方式。这里,只是简单的使用了Thread类来创建并启动线程,也可以使用线程池的方式实现。大家可自行实现以线程池的方式通过回调接口获取异步结果。

2.有返回结果的异步模型

尽管使用回调接口能够获取异步任务的结果,但是这种方式使用起来略显复杂。在JDK中提供了可以直接返回异步结果的处理方案。最常用的就是使用Future接口或者其实现类FutureTask来接收任务的返回结果。

  • 使用Future接口获取异步结果

使用Future接口往往配合线程池来获取异步执行结果,如下所示。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试Future获取异步结果
 */

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "测试Future获取异步结果";
            }
        });
        System.out.println(future.get());
        executorService.shutdown();
    }
}

运行结果如下所示。

测试Future获取异步结果
  • 使用FutureTask类获取异步结果

FutureTask类既可以结合Thread类使用也可以结合线程池使用,接下来,就看下这两种使用方式。

结合Thread类的使用示例如下所示。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试FutureTask获取异步结果
 */

public class FutureTaskTest {

    public static void main(String[] args)throws ExecutionException, InterruptedException{
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "测试FutureTask获取异步结果";
            }
        });
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

运行结果如下所示。

测试FutureTask获取异步结果

结合线程池的使用示例如下。

package io.binghe.concurrent.lab04;

import java.util.concurrent.*;

/**
 * @author binghe
 * @version 1.0.0
 * @description 测试FutureTask获取异步结果
 */

public class FutureTaskTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "测试FutureTask获取异步结果";
            }
        });
        executorService.execute(futureTask);
        System.out.println(futureTask.get());
        executorService.shutdown();
    }
}

运行结果如下所示。

测试FutureTask获取异步结果

可以看到使用Future接口或者FutureTask类来获取异步结果比使用回调接口获取异步结果简单多了。注意:实现异步的方式很多,这里只是用多线程举例。

接下来,就深入分析下Future接口。

深度解析Future接口

1.Future接口

Future是JDK1.5新增的异步编程接口,其源代码如下所示。

package java.util.concurrent;

public interface Future<V{

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    get() throws InterruptedException, ExecutionException;

    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

可以看到,在Future接口中,总共定义了5个抽象方法。接下来,就分别介绍下这5个方法的含义。

  • cancel(boolean)

取消任务的执行,接收一个boolean类型的参数,成功取消任务,则返回true,否则返回false。当任务已经完成,已经结束或者因其他原因不能取消时,方法会返回false,表示任务取消失败。当任务未启动调用了此方法,并且结果返回true(取消成功),则当前任务不再运行。如果任务已经启动,会根据当前传递的boolean类型的参数来决定是否中断当前运行的线程来取消当前运行的任务。

  • isCancelled()

判断任务在完成之前是否被取消,如果在任务完成之前被取消,则返回true;否则,返回false。

这里需要注意一个细节:只有任务未启动,或者在完成之前被取消,才会返回true,表示任务已经被成功取消。其他情况都会返回false。

  • isDone()

判断任务是否已经完成,如果任务正常结束、抛出异常退出、被取消,都会返回true,表示任务已经完成。

  • get()

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成并返回任务的结果数据。

  • get(long, TimeUnit)

当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成,并设置了超时等待时间。在超时时间内任务完成,则返回结果;否则,抛出TimeoutException异常。

2.RunnableFuture接口

Future接口有一个重要的子接口,那就是RunnableFuture接口,RunnableFuture接口不但继承了Future接口,而且继承了java.lang.Runnable接口,其源代码如下所示。

package java.util.concurrent;

public interface RunnableFuture<Vextends RunnableFuture<V{
    void run();
}

这里,问一下,RunnableFuture接口中有几个抽象方法?想好了再说!哈哈哈。。。

这个接口比较简单run()方法就是运行任务时调用的方法。

3.FutureTask类

FutureTask类是RunnableFuture接口的一个非常重要的实现类,它实现了RunnableFuture接口、Future接口和Runnable接口的所有方法。FutureTask类的源代码比较多,这个就不粘贴了,大家自行到java.util.concurrent下查看。

(1)FutureTask类中的变量与常量

在FutureTask类中首先定义了一个状态变量state,这个变量使用了volatile关键字修饰,这里,大家只需要知道volatile关键字通过内存屏障和禁止重排序优化来实现线程安全,后续会单独深度分析volatile关键字是如何保证线程安全的。紧接着,定义了几个任务运行时的状态常量,如下所示。

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

其中,代码注释中给出了几个可能的状态变更流程,如下所示。

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

接下来,定义了其他几个成员变量,如下所示。

private Callable<V> callable;
private Object outcome; 
private volatile Thread runner;
private volatile WaitNode waiters;

又看到我们所熟悉的Callable接口了,Callable接口那肯定就是用来调用call()方法执行具体任务了。

  • outcome:Object类型,表示通过get()方法获取到的结果数据或者异常信息。

  • runner:运行Callable的线程,运行期间会使用CAS保证线程安全,这里大家只需要知道CAS是Java保证线程安全的一种方式,后续文章中会深度分析CAS如何保证线程安全。

  • waiters:WaitNode类型的变量,表示等待线程的堆栈,在FutureTask的实现中,会通过CAS结合此堆栈交换任务的运行状态。

看一下WaitNode类的定义,如下所示。

static final class WaitNode {
 volatile Thread thread;
 volatile WaitNode next;
 WaitNode() { thread = Thread.currentThread(); }
}

可以看到,WaitNode类是FutureTask类的静态内部类,类中定义了一个Thread成员变量和指向下一个WaitNode节点的引用。其中通过构造方法将thread变量设置为当前线程。

(2)构造方法

接下来,是FutureTask的两个构造方法,比较简单,如下所示。

public FutureTask(Callable<V> callable) {
 if (callable == null)
  throw new NullPointerException();
 this.callable = callable;
 this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
 this.callable = Executors.callable(runnable, result);
 this.state = NEW;
}

(3)是否取消与完成方法

继续向下看源码,看到一个任务是否取消的方法,和一个任务是否完成的方法,如下所示。

public boolean isCancelled() {
 return state >= CANCELLED;
}

public boolean isDone() {
 return state != NEW;
}

这两方法中,都是通过判断任务的状态来判定任务是否已取消和已完成的。为啥会这样判断呢?再次查看FutureTask类中定义的状态常量发现,其常量的定义是有规律的,并不是随意定义的。其中,大于或者等于CANCELLED的常量为CANCELLED、INTERRUPTING和INTERRUPTED,这三个状态均可以表示线程已经被取消。当状态不等于NEW时,可以表示任务已经完成。

通过这里,大家可以学到一点:以后在编码过程中,要按照规律来定义自己使用的状态,尤其是涉及到业务中有频繁的状态变更的操作,有规律的状态可使业务处理变得事半功倍,这也是通过看别人的源码设计能够学到的,这里,建议大家还是多看别人写的优秀的开源框架的源码。

(4)取消方法

我们继续向下看源码,接下来,看到的是cancel(boolean)方法,如下所示。

public boolean cancel(boolean mayInterruptIfRunning) {
 if (!(state == NEW &&
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
     mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
  return false;
 try {    // in case call to interrupt throws exception
  if (mayInterruptIfRunning) {
   try {
    Thread t = runner;
    if (t != null)
     t.interrupt();
   } finally { // final state
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
   }
  }
 } finally {
  finishCompletion();
 }
 return true;
}

接下来,拆解cancel(boolean)方法。在cancel(boolean)方法中,首先判断任务的状态和CAS的操作结果,如果任务的状态不等于NEW或者CAS的操作返回false,则直接返回false,表示任务取消失败。如下所示。

if (!(state == NEW &&
   UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 return false;

接下来,在try代码块中,首先判断是否可以中断当前任务所在的线程来取消任务的运行。如果可以中断当前任务所在的线程,则以一个Thread临时变量来指向运行任务的线程,当指向的变量不为空时,调用线程对象的interrupt()方法来中断线程的运行,最后将线程标记为被中断的状态。如下所示。

try {
 if (mayInterruptIfRunning) {
  try {
   Thread t = runner;
   if (t != null)
    t.interrupt();
  } finally { // final state
   UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
  }
 }
}

这里,发现变更任务状态使用的是UNSAFE.putOrderedInt()方法,这个方法是个什么鬼呢?点进去看一下,如下所示。

public native void putOrderedInt(Object var1, long var2, int var4);

可以看到,又是一个本地方法,嘿嘿,这里先不管它,后续文章会详解这些方法的作用。

接下来,cancel(boolean)方法会进入finally代码块,如下所示。

finally {
 finishCompletion();
}

可以看到在finallly代码块中调用了finishCompletion()方法,顾名思义,finishCompletion()方法表示结束任务的运行,接下来看看它是如何实现的。点到finishCompletion()方法中看一下,如下所示。

private void finishCompletion() {
 // assert state > COMPLETING;
 for (WaitNode q; (q = waiters) != null;) {
  if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
   for (;;) {
    Thread t = q.thread;
    if (t != null) {
     q.thread = null;
     LockSupport.unpark(t);
    }
    WaitNode next = q.next;
    if (next == null)
     break;
    q.next = null// unlink to help gc
    q = next;
   }
   break;
  }
 }
 done();
 callable = null;        // to reduce footprint
}

在finishCompletion()方法中,首先定义一个for循环,循环终止因子为waiters为null,在循环中,判断CAS操作是否成功,如果成功进行if条件中的逻辑。首先,定义一个for自旋循环,在自旋循环体中,唤醒WaitNode堆栈中的线程,使其运行完成。当WaitNode堆栈中的线程运行完成后,通过break退出外层for循环。接下来调用done()方法。done()方法又是个什么鬼呢?点进去看一下,如下所示。

protected void done() { }

可以看到,done()方法是一个空的方法体,交由子类来实现具体的业务逻辑。

当我们的具体业务中,需要在取消任务时,执行一些额外的业务逻辑,可以在子类中覆写done()方法的实现。

(5)get()方法

继续向下看FutureTask类的代码,FutureTask类中实现了两个get()方法,如下所示。

public V get() throws InterruptedException, ExecutionException {
 int s = state;
 if (s <= COMPLETING)
  s = awaitDone(false0L);
 return report(s);
}

public V get(long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException 
{
 if (unit == null)
  throw new NullPointerException();
 int s = state;
 if (s <= COMPLETING &&
  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
  throw new TimeoutException();
 return report(s);
}

没参数的get()方法为当任务未运行完成时,会阻塞,直到返回任务结果。有参数的get()方法为当任务未运行完成,并且等待时间超出了超时时间,会TimeoutException异常。

两个get()方法的主要逻辑差不多,一个没有超时设置,一个有超时设置,这里说一下主要逻辑。判断任务的当前状态是否小于或者等于COMPLETING,也就是说,任务是NEW状态或者COMPLETING,调用awaitDone()方法,看下awaitDone()方法的实现,如下所示。

private int awaitDone(boolean timed, long nanos)
 throws InterruptedException 
{
 final long deadline = timed ? System.nanoTime() + nanos : 0L;
 WaitNode q = null;
 boolean queued = false;
 for (;;) {
  if (Thread.interrupted()) {
   removeWaiter(q);
   throw new InterruptedException();
  }

  int s = state;
  if (s > COMPLETING) {
   if (q != null)
    q.thread = null;
   return s;
  }
  else if (s == COMPLETING) // cannot time out yet
   Thread.yield();
  else if (q == null)
   q = new WaitNode();
  else if (!queued)
   queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
             q.next = waiters, q);
  else if (timed) {
   nanos = deadline - System.nanoTime();
   if (nanos <= 0L) {
    removeWaiter(q);
    return state;
   }
   LockSupport.parkNanos(this, nanos);
  }
  else
   LockSupport.park(this);
 }
}

接下来,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循环,在循环中首先判断当前线程是否被中断,如果已经被中断,则调用removeWaiter()将当前线程从堆栈中移除,并且抛出InterruptedException异常,如下所示。

if (Thread.interrupted()) {
 removeWaiter(q);
 throw new InterruptedException();
}

接下来,判断任务的当前状态是否完成,如果完成,并且堆栈句柄不为空,则将堆栈中的当前线程设置为空,返回当前任务的状态,如下所示。

int s = state;
if (s > COMPLETING) {
 if (q != null)
  q.thread = null;
 return s;
}

当任务的状态为COMPLETING时,使当前线程让出CPU资源,如下所示。

else if (s == COMPLETING)
 Thread.yield();

如果堆栈为空,则创建堆栈对象,如下所示。

else if (q == null)
 q = new WaitNode();

如果queued变量为false,通过CAS操作为queued赋值,如果awaitDone()方法传递的timed参数为true,则计算超时时间,当时间已超时,则在堆栈中移除当前线程并返回任务状态,如下所示。如果未超时,则重置超时时间,如下所示。

else if (!queued)
 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
           q.next = waiters, q);
else if (timed) {
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
  removeWaiter(q);
  return state;
 }
 LockSupport.parkNanos(this, nanos);
}

如果不满足上述的所有条件,则将当前线程设置为等待状态,如下所示。

else
 LockSupport.park(this);

接下来,回到get()方法中,当awaitDone()方法返回结果,或者任务的状态不满足条件时,都会调用report()方法,并将当前任务的状态传递到report()方法中,并返回结果,如下所示。

return report(s);

看来,这里还要看下report()方法啊,点进去看下report()方法的实现,如下所示。

private V report(int s) throws ExecutionException {
 Object x = outcome;
 if (s == NORMAL)
  return (V)x;
 if (s >= CANCELLED)
  throw new CancellationException();
 throw new ExecutionException((Throwable)x);
}

可以看到,report()方法的实现比较简单,首先,将outcome数据赋值给x变量,接下来,主要是判断接收到的任务状态,如果状态为NORMAL,则将x强转为泛型类型返回;当任务的状态大于或者等于CANCELLED,也就是任务已经取消,则抛出CancellationException异常,其他情况则抛出ExecutionException异常。

至此,get()方法分析完成。注意:一定要理解get()方法的实现,因为get()方法是我们使用Future接口和FutureTask类时,使用的比较频繁的一个方法。

(6)set()方法与setException()方法

继续看FutureTask类的代码,接下来看到的是set()方法与setException()方法,如下所示。

protected void set(V v) {
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  outcome = v;
  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  finishCompletion();
 }
}

protected void setException(Throwable t) {
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  outcome = t;
  UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  finishCompletion();
 }
}

通过源码可以看出,set()方法与setException()方法整体逻辑几乎一样,只是在设置任务状态时一个将状态设置为NORMAL,一个将状态设置为EXCEPTIONAL。

至于finishCompletion()方法,前面已经分析过。

(7)run()方法与runAndReset()方法

接下来,就是run()方法了,run()方法的源代码如下所示。

public void run() {
 if (state != NEW ||
  !UNSAFE.compareAndSwapObject(this, runnerOffset,
          null, Thread.currentThread()))
  return;
 try {
  Callable<V> c = callable;
  if (c != null && state == NEW) {
   V result;
   boolean ran;
   try {
    result = c.call();
    ran = true;
   } catch (Throwable ex) {
    result = null;
    ran = false;
    setException(ex);
   }
   if (ran)
    set(result);
  }
 } finally {
  // runner must be non-null until state is settled to
  // prevent concurrent calls to run()
  runner = null;
  // state must be re-read after nulling runner to prevent
  // leaked interrupts
  int s = state;
  if (s >= INTERRUPTING)
   handlePossibleCancellationInterrupt(s);
 }
}

可以这么说,只要使用了Future和FutureTask,就必然会调用run()方法来运行任务,掌握run()方法的流程是非常有必要的。在run()方法中,如果当前状态不是NEW,或者CAS操作返回的结果为false,则直接返回,不再执行后续逻辑,如下所示。

if (state != NEW ||
 !UNSAFE.compareAndSwapObject(this, runnerOffset,
         null, Thread.currentThread()))
 return;

接下来,在try代码块中,将成员变量callable赋值给一个临时变量c,判断临时变量不等于null,并且任务状态为NEW,则调用Callable接口的call()方法,并接收结果数据。并将ran变量设置为true。当程序抛出异常时,将接收结果的变量设置为null,ran变量设置为false,并且调用setException()方法将任务的状态设置为EXCEPTIONA。接下来,如果ran变量为true,则调用set()方法,如下所示。

try {
 Callable<V> c = callable;
 if (c != null && state == NEW) {
  V result;
  boolean ran;
  try {
   result = c.call();
   ran = true;
  } catch (Throwable ex) {
   result = null;
   ran = false;
   setException(ex);
  }
  if (ran)
   set(result);
 }
}

接下来,程序会进入finally代码块中,如下所示。

finally {
 // runner must be non-null until state is settled to
 // prevent concurrent calls to run()
 runner = null;
 // state must be re-read after nulling runner to prevent
 // leaked interrupts
 int s = state;
 if (s >= INTERRUPTING)
  handlePossibleCancellationInterrupt(s);
}

这里,将runner设置为null,如果任务的当前状态大于或者等于INTERRUPTING,也就是线程被中断了。则调用handlePossibleCancellationInterrupt()方法,接下来,看下handlePossibleCancellationInterrupt()方法的实现。

private void handlePossibleCancellationInterrupt(int s) {
 if (s == INTERRUPTING)
  while (state == INTERRUPTING)
   Thread.yield();
}

可以看到,handlePossibleCancellationInterrupt()方法的实现比较简单,当任务的状态为INTERRUPTING时,使用while()循环,条件为当前任务状态为INTERRUPTING,将当前线程占用的CPU资源释放,也就是说,当任务运行完成后,释放线程所占用的资源。

runAndReset()方法的逻辑与run()差不多,只是runAndReset()方法会在finally代码块中将任务状态重置为NEW。runAndReset()方法的源代码如下所示,就不重复说了。

protected boolean runAndReset() {
 if (state != NEW ||
  !UNSAFE.compareAndSwapObject(this, runnerOffset,
          null, Thread.currentThread()))
  return false;
 boolean ran = false;
 int s = state;
 try {
  Callable<V> c = callable;
  if (c != null && s == NEW) {
   try {
    c.call(); // don't set result
    ran = true;
   } catch (Throwable ex) {
    setException(ex);
   }
  }
 } finally {
  // runner must be non-null until state is settled to
  // prevent concurrent calls to run()
  runner = null;
  // state must be re-read after nulling runner to prevent
  // leaked interrupts
  s = state;
  if (s >= INTERRUPTING)
   handlePossibleCancellationInterrupt(s);
 }
 return ran && s == NEW;
}

(8)removeWaiter()方法

removeWaiter()方法中主要是使用自旋循环的方式来移除WaitNode中的线程,比较简单,如下所示。

private void removeWaiter(WaitNode node) {
 if (node != null) {
  node.thread = null;
  retry:
  for (;;) {          // restart on removeWaiter race
   for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    s = q.next;
    if (q.thread != null)
     pred = q;
    else if (pred != null) {
     pred.next = s;
     if (pred.thread == null// check for race
      continue retry;
    }
    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
               q, s))
     continue retry;
   }
   break;
  }
 }
}

最后,在FutureTask类的最后,有如下代码。

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class<?> k = FutureTask.class;
  stateOffset = UNSAFE.objectFieldOffset
   (k.getDeclaredField("state"));
  runnerOffset = UNSAFE.objectFieldOffset
   (k.getDeclaredField("runner"));
  waitersOffset = UNSAFE.objectFieldOffset
   (k.getDeclaredField("waiters"));
 } catch (Exception e) {
  throw new Error(e);
 }
}

关于这些代码的作用,会在后续深度解析CAS文章中详细说明,这里就不再探讨。

至此,关于Future接口和FutureTask类的源码就分析完了。

好了,今天就到这儿吧,我是冰河,大家有啥问题可以在下方留言,也可以加我微信:sun_shine_lyz,我拉你进群,一起交流技术,一起进阶,一起进大厂~~

冰河

2021/04/18  阅读:79  主题:嫩青

作者介绍

冰河