ExecutorService Callable Future多线程返回结果的原理是什么(callable,executorservice,future,开发技术)

时间:2024-05-10 09:27:09 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

希望大家仔细阅读,能够学有所成!

正文

在并发多线程场景下,存在需要获取各线程的异步执行结果,这时,就可以通过ExecutorService线程池结合Callable、Future来实现。

简单例子

publicclassExecutorTest{
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
ExecutorServiceexecutor=Executors.newSingleThreadExecutor();
Callablecallable=newMyCallable();
Futurefuture=executor.submit(callable);
System.out.println("打印线程池返回值:"+future.get());
}
}
classMyCallableimplementsCallable<String>{
@Override
publicStringcall()throwsException{
return"测试返回值";
}
}

执行完成后,会打印出以下结果:

打印线程池返回值:测试返回值

可见,线程池执行完异步线程任务,我们是可以获取到异步线程里的返回值。

那么,ExecutorService、Callable、Future实现有返回结果的多线程是如何实现的呢?

首先,我们需要创建一个实现函数式接口Callable的类,该Callable接口只定义了一个被泛型修饰的call方法,这意味着,需要返回什么类型的值可以由具体实现类来定义&mdash;&mdash;

@FunctionalInterface
publicinterfaceCallable<V>{
Vcall()throwsException;
}

因此,我自定义了一个实现Callable接口的类,该类的重写了call方法,我们在执行多线程时希望返回什么样的结果,就可以在该重写的call方法定义。

classMyCallableimplementsCallable<String>{
@Override
publicStringcall()throwsException{
return"测试返回值";
}
}

在自定义的MyCallable类中,我在call方法里设置一个很简单的String返回值 “测试返回值”,这意味着,我是希望在线程池执行完异步线程任务时,可以返回“测试返回值”这个字符串给我。

接下来,我们就可以创建该MyCallable类的对象,然后通过executor.submit(callable)丢到线程池里,线程池里会利用空闲线程来帮我们执行一个异步线程任务。

ExecutorServiceexecutor=Executors.newSingleThreadExecutor();
Callablecallable=newMyCallable();
Futurefuture=executor.submit(callable);

值得注意一点是,若需要实现获取线程返回值的效果,只能通过executor.submit(callable)去执行,而不能通过executor.execute(Runnable command)执行,因为executor.execute(Runnable command)只能传入实现Runnable 接口的对象,但这类对象是不具备返回线程效果的功能。

进入到executor.submit(callable)底层,具体实现在AbstractExecutorService类中。可以看到,执行到submit方法内部时,会将我们传进来的new MyCallable()对象作为参数传入到newTaskFor(task)方法里&mdash;&mdash;

public<T>Future<T>submit(Callable<T>task){
if(task==null)thrownewNullPointerException();
RunnableFuture<T>ftask=newTaskFor(task);
execute(ftask);
returnftask;
}

这个newTaskFor(task)方法内部具体实现,是将new MyCallable()对象传入构造器中,生成了一个FutureTask对象。

protected<T>RunnableFuture<T>newTaskFor(Callable<T>callable){
returnnewFutureTask<T>(callable);
}

这个FutureTask对象实现RunableFuture接口,这个RunableFuture接口又继承了Runnable,说明FutureTask类内部会实现一个run方法,然后本身就可以当做一个Runnable线程任务,借助线程Thread(new FutureTask(.....)).start()方式开启一个新线程,去异步执行其内部实现的run方法逻辑。

异步执行内部实现run方法逻辑

publicclassFutureTask<V>implementsRunnableFuture<V>{.....}
publicinterfaceRunnableFuture<V>extendsRunnable,Future<V>{
/*
SetsthisFuturetotheresultofitscomputation
unlessithasbeencancelled.
/
voidrun();
}

分析到这里,可以知道FutureTask的核心方法一定是run方法,线程执行start方法后,最后会去调用FutureTask的run方法。在讲解这个run方法前,我们先去看一下创建FutureTask的初始化构造方法底层逻辑new FutureTask(callable)

publicclassFutureTask<V>implementsRunnableFuture<V>{
privateCallable<V>callable;
......//省略其余源码
publicFutureTask(Callable<V>callable){
if(callable==null)
thrownewNullPointerException();
//通过构造方法初始化Callable<V>callable赋值
this.callable=callable;
this.state=NEW;//ensurevisibilityofcallable
}
......//省略其余源码
}

可以看到,FutureTask(Callablecallable)构造器,主要是将我们先前创建的new MyCallable()对象传进来,赋值给FutureTask内部定义的Callablecallable引用,实现子类对象指向父类引用。这一点很关键,这就意味着,在初始化创建FutureTask对象后,我们是可以通过callable.call()来调用我们自定义设置可以返回“测试返回值”的call方法,这不就是我们希望在异步线程执行完后能够返回的值吗?

我们不妨猜测一下整体返数主流程,在Thread(new FutureTask(.....)).start()开启一个线程后,当线程获得了CPU时间片,就会去执行FutureTask对象里的run方法,这时run方法里可以通过callable.call()调用到我们自定义的MyCallable#call()方法,进而得到方法返回值 “测试返回值”&mdash;&mdash;到这一步,只需要将这个返回值赋值给FutureTask里某个定义的对象属性,那么,在主线程在通过获取FutureTask里被赋值的X对象属性值,不就可以拿到返回字符串值 “测试返回值”了吗?

实现上,主体流程确实是这样,只不过忽略了一些细节而已。

ExecutorService Callable Future多线程返回结果的原理是什么

FutureTask的run方法

publicvoidrun(){
//如果状态不是NEW或者设置runner为当前线程时,说明FutureTask任务已经取消,无法继续执行
if(state!=NEW||
!UNSAFE.compareAndSwapObject(this,runnerOffset,
null,Thread.currentThread()))
return;
try{
//在该文中,callable被赋值为指向我们定义的newMyCallable()对象引用
Callable<V>c=callable;
if(c!=null&&state==NEW){
Vresult;
booleanran;
try{
//c.call最后会调用newMyCallable()的call()方法,得到字符串返回值“测试返回值”给result
result=c.call();
ran=true;
}catch(Throwableex){
result=null;
ran=false;
setException(ex);
}
//正常执行完c.call()方法时,ran值为true,说明会执行set(result)方法。
if(ran)
set(result);
}
}finally{
//runnermustbenon-nulluntilstateissettledto
//preventconcurrentcallstorun()
runner=null;
//statemustbere-readafternullingrunnertoprevent
//leakedinterrupts
ints=state;
if(s>=INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

根据以上源码简单分析,可以看到run方法当中,最终确实会执行new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result,然后执行set(result)方法,根据set方法名就不难猜出,这是一个会赋值给某个字段的方法。

这里分析会忽略一些状态值的讲解,这块会包括线程的取消、终止等内容,后面我会出一片专门针对FutureTask源码分析的文章再介绍,本文主要还是介绍异步线程返回结果的主要原理。

set(result)方法

沿着以上分析,追踪至set(result)方法里&mdash;&mdash;

protectedvoidset(Vv){
//通过CAS原子操作,将运行的线程设置为COMPLETING,说明线程已经执行完成中
if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){
//若CAS原子比较赋值成功,说明线程可以被正常执行完成的话,然后将result结果值赋值给outcome
outcome=v;
//线程正常完成结束
UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//finalstate
finishCompletion();
}
}

这个方法的主要是,若该线程执行能够正常完成话,就将得到的返回值赋值给outcome,这个outcome是FutureTask的一个Object变量&mdash;&mdash;

privateObjectoutcome;

至此,就完成了流程的这一步&mdash;&mdash;

ExecutorService Callable Future多线程返回结果的原理是什么

最后,就是执行主线程的根据ftask.get()获取执行完成的值,这个get可以设置超时时间,例如 ftask.get(2,TimeUnit.SECONDS)表示超过2秒还没有获取到线程返回值的话,就直接结束该get方法,继续主线程往下执行。

System.out.println("打印线程池返回值:"+ftask.get(2,TimeUnit.SECONDS));

进入到get方法,可以看到当状态在s <= COMPLETING时,表示任务还没有执行完,就会去执行awaitDone(false, 0L)方法,这个方法表示,将一直做死循环等待线程执行完成,才会跳出等待循环继续往下走。若设置了超时时间,例如ftask.get(2,TimeUnit.SECONDS)),就会在awaitDone方法循环至2秒,在2秒内发现线程状态被设置为正常完成时,就会跳出循环,若2秒后线程没有执行完成,也会强制跳出循环了,但这种情况将无法获取到线程结果值。

publicVget()throwsInterruptedException,ExecutionException{
ints=state;
if(s&lt;=COMPLETING)
//循环等待线程执行状态
s=awaitDone(false,0L);
returnreport(s);
}

最后就是report(s)方法,可以看到outcome值最终赋值给Object x,若s==NORMAL表示线程任务已经正常完成结束,就可以根据我们定义的类型进行泛型转换返回,我们定义的是String字符串类型,故而会返回字符串值,也就是 “测试返回值”。

privateVreport(ints)throwsExecutionException{
Objectx=outcome;
if(s==NORMAL)
//返回线程任务执行结果
return(V)x;
if(s>=CANCELLED)
thrownewCancellationException();
thrownewExecutionException((Throwable)x);
}

你看,最后就能获取到了异步线程执行的结果返回给main主线程&mdash;&mdash;

ExecutorService Callable Future多线程返回结果的原理是什么

以上就是执行线程任务run方法后,如何将线程任务结果返回给主线程,其实,还少一个地方补充,就是如何将FutureTask任务丢给线程执行,我们这里用到了线程池, 但是execute(ftask)底层同样是使用一个了线程通过执行start方法开启一个线程,这个新运行的线程最终会执行FutureTask的run方法。

public<T>Future<T>submit(Callable<T>task){
if(task==null)thrownewNullPointerException();
RunnableFuture<T>ftask=newTaskFor(task);
execute(ftask);
returnftask;
}

可以简单优化下,直接用一个线程演示该案例,这样看着更好理解些,当时,生产上是不会有这样直接用一个线程来执行的,更多是通过原生线程池&mdash;&mdash;

publicstaticvoidmain(String[]args)throwsException{
Callablecallable=newMyCallable();
RunnableFuture<String>ftask=newFutureTask<String>(callable);
newThread(ftask).start();
System.out.println("打印线程池返回值:"+ftask.get());
}

本文:ExecutorService Callable Future多线程返回结果的原理是什么的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:python3怎么解压缩.gz文件下一篇:

14 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18