怎样深度解读java线程池设计思想及源码实现
导读:本文共29030字符,通常情况下阅读需要97分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 前言线程池是非常重要的工具,如果你要成为一个好的工程师,还是得比较好地掌握这个知识,很多线上问题都是因为没有用好线程池导致的。即使你为了谋生,也要知道,这基本上是面试必问的题目,而且面试官很容易从被面试者的回答中捕捉到被面试者的技术水平。总览开篇来一些废话。下图是 java 线程池几个相关类的继承结构:先简单说说这个继承结构,Executor 位于最顶层,也是最... ...
目录
(为您整理了一些要点),点击可以直达。前言
线程池是非常重要的工具,如果你要成为一个好的工程师,还是得比较好地掌握这个知识,很多线上问题都是因为没有用好线程池导致的。即使你为了谋生,也要知道,这基本上是面试必问的题目,而且面试官很容易从被面试者的回答中捕捉到被面试者的技术水平。
总览
开篇来一些废话。下图是 java 线程池几个相关类的继承结构:
先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) 接口方法定义。
ExecutorService 也是接口,在 Executor 接口的基础上添加了很多的接口方法,所以一般来说我们会使用这个接口。
然后再下来一层是 AbstractExecutorService,从名字我们就知道,这是抽象类,这里实现了非常有用的一些方法供子类直接使用,之后我们再细说。
然后才到我们的重点部分 ThreadPoolExecutor 类,这个类提供了关于线程池所需的非常丰富的功能。
另外,我们还涉及到下图中的这些类:
同在并发包中的 Executors 类,类名中带字母 s,我们猜到这个是工具类,里面的方法都是静态方法,如以下我们最常用的用于生成 ThreadPoolExecutor 的实例的一些方法:
publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<Runnable>());}publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());}
另外,由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程中,我们是往线程池提交任务(task),使用过线程池的都知道,我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。这样,读者才能比较容易记住 FutureTask 这个类名:它首先是一个任务(Task),然后具有 Future 接口的语义,即可以在将来(Future)得到执行的结果。
当然,线程池中的 BlockingQueue 也是非常重要的概念,如果线程数达到 corePoolSize,我们的每个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 通常我们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每个实现类都有不同的特征,使用场景之后会慢慢分析。想要详细了解各个 BlockingQueue 的读者,可以参考我的前面的一篇对 BlockingQueue 的各个实现类进行详细分析的文章。
把事情说完整:除了上面说的这些类外,还有一个很重要的类,就是定时任务实现类 ScheduledThreadPoolExecutor,它继承自本文要重点讲解的 ThreadPoolExecutor,用于实现定时执行。不过本文不会介绍它的实现,我相信读者看完本文后可以比较容易地看懂它的源码。
以上就是本文要介绍的知识,废话不多说,开始进入正文。
Executor 接口
/**@since1.5*@authorDougLea*/publicinterfaceExecutor{voidexecute(Runnablecommand);}
我们可以看到 Executor 接口非常简单,就一个 void execute(Runnable command)
方法,代表提交一个任务。为了让大家理解 java 线程池的整个设计方案,我会按照 Doug Lea 的设计思路来多说一些相关的东西。
我们经常这样启动一个线程:
newThread(newRunnable(){//dosomething}).start();
用了线程池 Executor 后就可以像下面这么使用:
Executorexecutor=anExecutor;executor.execute(newRunnableTask1());executor.execute(newRunnableTask2());
如果我们希望线程池同步执行每一个任务,我们可以这么实现这个接口:
classDirectExecutorimplementsExecutor{publicvoidexecute(Runnabler){r.run();//这里不是用的newThread(r).start(),也就是说没有启动任何一个新的线程。}}
我们希望每个任务提交进来后,直接启动一个新的线程来执行这个任务,我们可以这么实现:
classThreadPerTaskExecutorimplementsExecutor{publicvoidexecute(Runnabler){newThread(r).start();//每个任务都用一个新的线程来执行}}
我们再来看下怎么组合两个 Executor 来使用,下面这个实现是将所有的任务都加到一个 queue 中,然后从 queue 中取任务,交给真正的执行器执行,这里采用 synchronized 进行并发控制:
classSerialExecutorimplementsExecutor{//任务队列finalQueue<Runnable>tasks=newArrayDeque<Runnable>();//这个才是真正的执行器finalExecutorexecutor;//当前正在执行的任务Runnableactive;//初始化的时候,指定执行器SerialExecutor(Executorexecutor){this.executor=executor;}//添加任务到线程池:将任务添加到任务队列,scheduleNext触发执行器去任务队列取任务publicsynchronizedvoidexecute(finalRunnabler){tasks.offer(newRunnable(){publicvoidrun(){try{r.run();}finally{scheduleNext();}}});if(active==null){scheduleNext();}}protectedsynchronizedvoidscheduleNext(){if((active=tasks.poll())!=null){//具体的执行转给真正的执行器executorexecutor.execute(active);}}}
当然了,Executor 这个接口只有提交任务的功能,太简单了,我们想要更丰富的功能,比如我们想知道执行结果、我们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来我们要介绍的是继承自 Executor
接口的 ExecutorService
接口,这个接口提供了比较丰富的功能,也是我们最常使用到的接口。
ExecutorService
一般我们定义一个线程池的时候,往往都是使用这个接口:
ExecutorServiceexecutor=Executors.newFixedThreadPool(args...);ExecutorServiceexecutor=Executors.newCachedThreadPool(args...);
因为这个接口中定义的一系列方法大部分情况下已经可以满足我们的需要了。
那么我们简单初略地来看一下这个接口中都有哪些方法:
publicinterfaceExecutorServiceextendsExecutor{//关闭线程池,已提交的任务继续执行,不接受继续提交新任务voidshutdown();//关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务//它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务List<Runnable>shutdownNow();//线程池是否已关闭booleanisShutdown();//如果调用了shutdown()或shutdownNow()方法后,所有任务结束了,那么返回true//这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回truebooleanisTerminated();//等待所有任务完成,并设置超时时间//我们这么理解,实际应用中是,先调用shutdown或shutdownNow,//然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时booleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException;//提交一个Callable任务<T>Future<T>submit(Callable<T>task);//提交一个Runnable任务,第二个参数将会放到Future中,作为返回值,//因为Runnable的run方法本身并不返回任何东西<T>Future<T>submit(Runnabletask,Tresult);//提交一个Runnable任务Future<?>submit(Runnabletask);//执行所有任务,返回Future类型的一个list<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks)throwsInterruptedException;//也是执行所有任务,但是这里设置了超时时间<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException;//只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果<T>TinvokeAny(Collection<?extendsCallable<T>>tasks)throwsInterruptedException,ExecutionException;//同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,//不过这个带超时,超过指定的时间,抛出TimeoutException异常<T>TinvokeAny(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}
这些方法都很好理解,一个简单的线程池主要就是这些功能,能提交任务,能获取结果,能关闭线程池,这也是为什么我们经常用这个接口的原因。
FutureTask
在继续往下层介绍 ExecutorService 的实现类之前,我们先来说说相关的类 FutureTask。
FutureRunnable\/\/RunnableFuture||FutureTaskFutureTask通过RunnableFuture间接实现了Runnable接口,所以每个Runnable通常都先包装成FutureTask,然后调用executor.execute(Runnablecommand)将其提交给线程池
我们知道,Runnable 的 void run() 方法是没有返回值的,所以,通常,如果我们需要的话,会在 submit 中指定第二个参数作为返回值:
<T>Future<T>submit(Runnabletask,Tresult);
其实到时候会通过这两个参数,将其包装成 Callable。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。
publicinterfaceCallable<V>{Vcall()throwsException;}
在这里,就不展开说 FutureTask 类了,因为本文篇幅本来就够大了,这里我们需要知道怎么用就行了。
下面,我们来看看 ExecutorService
的抽象实现 AbstractExecutorService
。
AbstractExecutorService
AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。
这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。定义于最上层接口 Executor中的 void execute(Runnable command)
由于不需要获取结果,不会进行 FutureTask 的包装。
需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。
下面,我将一行一行源码地来分析这个类,跟着源码来看看其实现吧:
Tips: invokeAny 和 invokeAll 方法占了这整个类的绝大多数篇幅,读者可以选择适当跳过,因为它们可能在你的实践中使用的频次比较低,而且它们不带有承前启后的作用,不用担心会漏掉什么导致看不懂后面的代码。
publicabstractclassAbstractExecutorServiceimplementsExecutorService{//RunnableFuture是用于获取执行结果的,我们常用它的子类FutureTask//下面两个newTaskFor方法用于将我们的任务包装成FutureTask提交到线程池中执行protected<T>RunnableFuture<T>newTaskFor(Runnablerunnable,Tvalue){returnnewFutureTask<T>(runnable,value);}protected<T>RunnableFuture<T>newTaskFor(Callable<T>callable){returnnewFutureTask<T>(callable);}//提交任务publicFuture<?>submit(Runnabletask){if(task==null)thrownewNullPointerException();//1\.将任务包装成FutureTaskRunnableFuture<Void>ftask=newTaskFor(task,null);//2\.交给执行器执行,execute方法由具体的子类来实现//前面也说了,FutureTask间接实现了Runnable接口。execute(ftask);returnftask;}public<T>Future<T>submit(Runnabletask,Tresult){if(task==null)thrownewNullPointerException();//1\.将任务包装成FutureTaskRunnableFuture<T>ftask=newTaskFor(task,result);//2\.交给执行器执行execute(ftask);returnftask;}public<T>Future<T>submit(Callable<T>task){if(task==null)thrownewNullPointerException();//1\.将任务包装成FutureTaskRunnableFuture<T>ftask=newTaskFor(task);//2\.交给执行器执行execute(ftask);returnftask;}//此方法目的:将tasks集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了//第二个参数timed代表是否设置超时机制,超时时间为第三个参数,//如果timed为true,同时超时了还没有一个线程返回结果,那么抛出TimeoutException异常private<T>TdoInvokeAny(Collection<?extendsCallable<T>>tasks,booleantimed,longnanos)throwsInterruptedException,ExecutionException,TimeoutException{if(tasks==null)thrownewNullPointerException();//任务数intntasks=tasks.size();if(ntasks==0)thrownewIllegalArgumentException();//List<Future<T>>futures=newArrayList<Future<T>>(ntasks);//ExecutorCompletionService不是一个真正的执行器,参数this才是真正的执行器//它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个completionQueue队列中//这也是为什么这个类的名字里面有个Completion的原因吧。ExecutorCompletionService<T>ecs=newExecutorCompletionService<T>(this);try{//用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常ExecutionExceptionee=null;longlastTime=timed?System.nanoTime():0;Iterator<?extendsCallable<T>>it=tasks.iterator();//首先先提交一个任务,后面的任务到下面的for循环一个个提交futures.add(ecs.submit(it.next()));//提交了一个任务,所以任务数量减1--ntasks;//正在执行的任务数(提交的时候+1,任务结束的时候-1)intactive=1;for(;;){//ecs上面说了,其内部有一个completionQueue用于保存执行完成的结果//BlockingQueue的poll方法不阻塞,返回null代表队列为空Future<T>f=ecs.poll();//为null,说明刚刚提交的第一个线程还没有执行完成//在前面先提交一个任务,加上这里做一次检查,也是为了提高性能if(f==null){if(ntasks>0){--ntasks;futures.add(ecs.submit(it.next()));++active;}//这里是elseif,不是if。这里说明,没有任务了,同时active为0说明//任务都执行完成了。其实我也没理解为什么这里做一次break?//因为我认为active为0的情况,必然从下面的f.get()返回了//2018-02-23感谢读者newmicro的comment,//这里的active==0,说明所有的任务都执行失败,那么这里是for循环出口elseif(active==0)break;//这里也是elseif。这里说的是,没有任务了,但是设置了超时时间,这里检测是否超时elseif(timed){//带等待的poll方法f=ecs.poll(nanos,TimeUnit.NANOSECONDS);//如果已经超时,抛出TimeoutException异常,这整个方法就结束了if(f==null)thrownewTimeoutException();longnow=System.nanoTime();nanos-=now-lastTime;lastTime=now;}//这里是else。说明,没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时)//take()方法会阻塞,直到有元素返回,说明有任务结束了elsef=ecs.take();}/**我感觉上面这一段并不是很好理解,这里简单说下。*1\.首先,这在一个for循环中,我们设想每一个任务都没那么快结束,*那么,每一次都会进到第一个分支,进行提交任务,直到将所有的任务都提交了*2\.任务都提交完成后,如果设置了超时,那么for循环其实进入了“一直检测是否超时”这件事情上*3\.如果没有设置超时机制,那么不必要检测超时,那就会阻塞在ecs.take()方法上,等待获取第一个执行结果*4\.如果所有的任务都执行失败,也就是说future都返回了,但是f.get()抛出异常,那么从active==0分支出去(感谢newmicro提出)//当然,这个需要看下面的if分支。*///有任务结束了if(f!=null){--active;try{//返回执行结果,如果有异常,都包装成ExecutionExceptionreturnf.get();}catch(ExecutionExceptioneex){ee=eex;}catch(RuntimeExceptionrex){ee=newExecutionException(rex);}}}//注意看for循环的范围,一直到这里if(ee==null)ee=newExecutionException();throwee;}finally{//方法退出之前,取消其他的任务for(Future<T>f:futures)f.cancel(true);}}public<T>TinvokeAny(Collection<?extendsCallable<T>>tasks)throwsInterruptedException,ExecutionException{try{returndoInvokeAny(tasks,false,0);}catch(TimeoutExceptioncannotHappen){assertfalse;returnnull;}}public<T>TinvokeAny(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{returndoInvokeAny(tasks,true,unit.toNanos(timeout));}//执行所有的任务,返回任务结果。//先不要看这个方法,我们先想想,其实我们自己提交任务到线程池,也是想要线程池执行所有的任务//只不过,我们是每次submit一个任务,这里以一个集合作为参数提交public<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks)throwsInterruptedException{if(tasks==null)thrownewNullPointerException();List<Future<T>>futures=newArrayList<Future<T>>(tasks.size());booleandone=false;try{//这个很简单for(Callable<T>t:tasks){//包装成FutureTaskRunnableFuture<T>f=newTaskFor(t);futures.add(f);//提交任务execute(f);}for(Future<T>f:futures){if(!f.isDone()){try{//这是一个阻塞方法,直到获取到值,或抛出了异常//这里有个小细节,其实get方法签名上是会抛出InterruptedException的//可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了f.get();}catch(CancellationExceptionignore){}catch(ExecutionExceptionignore){}}}done=true;//这个方法返回,不像其他的场景,返回List<Future>,其实执行结果还没出来//这个方法返回是真正的返回,任务都结束了returnfutures;}finally{//为什么要这个?就是上面说的有异常的情况if(!done)for(Future<T>f:futures)f.cancel(true);}}//带超时的invokeAll,我们找不同吧public<T>List<Future<T>>invokeAll(Collection<?extendsCallable<T>>tasks,longtimeout,TimeUnitunit)throwsInterruptedException{if(tasks==null||unit==null)thrownewNullPointerException();longnanos=unit.toNanos(timeout);List<Future<T>>futures=newArrayList<Future<T>>(tasks.size());booleandone=false;try{for(Callable<T>t:tasks)futures.add(newTaskFor(t));longlastTime=System.nanoTime();Iterator<Future<T>>it=futures.iterator();//每提交一个任务,检测一次是否超时while(it.hasNext()){execute((Runnable)(it.next()));longnow=System.nanoTime();nanos-=now-lastTime;lastTime=now;//超时if(nanos<=0)returnfutures;}for(Future<T>f:futures){if(!f.isDone()){if(nanos<=0)returnfutures;try{//调用带超时的get方法,这里的参数nanos是剩余的时间,//因为上面其实已经用掉了一些时间了f.get(nanos,TimeUnit.NANOSECONDS);}catch(CancellationExceptionignore){}catch(ExecutionExceptionignore){}catch(TimeoutExceptiontoe){returnfutures;}longnow=System.nanoTime();nanos-=now-lastTime;lastTime=now;}}done=true;returnfutures;}finally{if(!done)for(Future<T>f:futures)f.cancel(true);}}}
到这里,我们发现,这个抽象类包装了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。
鉴于本文的篇幅,我觉得看到这里的读者应该已经不多了,大家都习惯了快餐文化。我写的每篇文章都力求让读者可以通过我的一篇文章而对相关内容有全面的了解,所以篇幅不免长了些。
ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
我们可以基于它来进行业务上的扩展,以实现我们需要的其他功能,比如实现定时任务的类 ScheduledThreadPoolExecutor 就继承自 ThreadPoolExecutor。当然,这不是本文关注的重点,下面,还是赶紧进行源码分析吧。
首先,我们来看看线程池实现中的几个概念和处理流程。
我们先回顾下提交任务的几个方法:
publicFuture<?>submit(Runnabletask){if(task==null)thrownewNullPointerException();RunnableFuture<Void>ftask=newTaskFor(task,null);execute(ftask);returnftask;}public<T>Future<T>submit(Runnabletask,Tresult){if(task==null)thrownewNullPointerException();RunnableFuture<T>ftask=newTaskFor(task,result);execute(ftask);returnftask;}public<T>Future<T>submit(Callable<T>task){if(task==null)thrownewNullPointerException();RunnableFuture<T>ftask=newTaskFor(task);execute(ftask);returnftask;}
一个最基本的概念是,submit 方法中,参数是 Runnable 类型(也有Callable 类型),这个参数不是用于 new Thread(runnable).start() 中的,此处的这个参数不是用于启动线程的,这里指的是任务,任务要做的事情是 run() 方法里面定义的或 Callable 中的 call() 方法里面定义的。
初学者往往会搞混这个,因为 Runnable 总是在各个地方出现,经常把一个 Runnable 包到另一个 Runnable 中。请把它想象成有个 Task 接口,这个接口里面有一个 run() 方法。
我们回过神来继续往下看,我画了一个简单的示意图来描述线程池中的一些主要的构件:
当然,上图没有考虑队列是否有界,提交任务时队列满了怎么办?什么情况下会创建新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉?这些问题下面我们会一一解决。
我们经常会使用 Executors
这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。
publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());}publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<Runnable>());}
这里先不说有什么区别,它们最终都会导向这个构造方法:
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize<0||maximumPoolSize<=0||maximumPoolSize<corePoolSize||keepAliveTime<0)thrownewIllegalArgumentException();//这几个参数都是必须要有的if(workQueue==null||threadFactory==null||handler==null)thrownewNullPointerException();this.corePoolSize=corePoolSize;this.maximumPoolSize=maximumPoolSize;this.workQueue=workQueue;this.keepAliveTime=unit.toNanos(keepAliveTime);this.threadFactory=threadFactory;this.handler=handler;}
基本上,上面的构造方法中列出了我们最需要关心的几个属性了,下面逐个介绍下构造方法中出现的这几个属性:
corePoolSize
核心线程数,不要抠字眼,反正先记着有这么个属性就可以了。
maximumPoolSize
最大线程数,线程池允许创建的最大线程数。
workQueue
任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。
keepAliveTime
空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用
allowCoreThreadTimeOut(true)
使核心线程数内的线程也可以被回收。threadFactory
用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样。
handler:
当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑,这个之后再说。
除了上面几个属性外,我们再看看其他重要的属性。
Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(即使只有 29 位,也已经不小了,大概 5 亿多,现在还没有哪个机器能起这么多线程的吧)。我们知道,java 语言在整数编码上是统一的,都是采用补码的形式,下面是简单的移位操作和布尔操作,都是挺简单的。
privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));//这里COUNT_BITS设置为29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数//很多初学者很喜欢在自己的代码中写很多29这种数字,或者某个特殊的字符串,然后分布在各个地方,这是非常糟糕的privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;//00011111111111111111111111111111//这里得到的是29个1,也就是说线程池的最大线程数是2^29-1=536870911//以我们现在计算机的实际情况,这个数量还是够用的privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;//我们说了,线程池的状态存放在高3位中//运算结果为111跟29个0:11100000000000000000000000000000privatestaticfinalintRUNNING=-1<<COUNT_BITS;//00000000000000000000000000000000privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;//00100000000000000000000000000000privatestaticfinalintSTOP=1<<COUNT_BITS;//01000000000000000000000000000000privatestaticfinalintTIDYING=2<<COUNT_BITS;//01100000000000000000000000000000privatestaticfinalintTERMINATED=3<<COUNT_BITS;//将整数c的低29位修改为0,就得到了线程池的状态privatestaticintrunStateOf(intc){returnc&~CAPACITY;}//将整数c的高3为修改为0,就得到了线程池中的线程数privatestaticintworkerCountOf(intc){returnc&CAPACITY;}privatestaticintctlOf(intrs,intwc){returnrs|wc;}/**Bitfieldaccessorsthatdon'trequireunpackingctl.*ThesedependonthebitlayoutandonworkerCountbeingnevernegative.*/privatestaticbooleanrunStateLessThan(intc,ints){returnc<s;}privatestaticbooleanrunStateAtLeast(intc,ints){returnc>=s;}privatestaticbooleanisRunning(intc){returnc<SHUTDOWN;}
上面就是对一个整数的简单的位操作,几个操作方法将会在后面的源码中一直出现,所以读者最好把方法名字和其代表的功能记住,看源码的时候也就不需要来来回回翻了。
在这里,介绍下线程池中的各个状态和状态变化的转换过程:
RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。
看了这几种状态的介绍,读者大体也可以猜到十之八九的状态转换了,各个状态的转换过程有以下几种:
RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的
(RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚 shutDown() 和 shutDownNow() 的区别了
SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
STOP -> TIDYING:当任务队列清空后,发生这个转换
TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后
上面的几个记住核心的就可以了,尤其第一个和第二个。
另外,我们还要看看一个内部类 Worker,因为 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker。
Worker 这里又用到了抽象类 AbstractQueuedSynchronizer。题外话,AQS 在并发中真的是到处出现,而且非常容易使用,写少量的代码就能实现自己需要的同步方式(对 AQS 源码感兴趣的读者请参看我之前写的几篇文章)。
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{privatestaticfinallongserialVersionUID=6138294804551838833L;//这个是真正的线程,任务靠你啦finalThreadthread;//前面说了,这里的Runnable是任务。为什么叫firstTask?因为在创建线程的时候,如果同时指定了//这个线程起来以后需要执行的第一个任务,那么第一个任务就是存放在这里的(线程可不止执行这一个任务)//当然了,也可以为null,这样线程起来了,自己到任务队列(BlockingQueue)中取任务(getTask方法)就行了RunnablefirstTask;//用于存放此线程完成的任务数,注意了,这里用了volatile,保证可见性volatilelongcompletedTasks;//Worker只有这一个构造方法,传入firstTask,也可以传nullWorker(RunnablefirstTask){setState(-1);//inhibitinterruptsuntilrunWorkerthis.firstTask=firstTask;//调用ThreadFactory来创建一个新的线程this.thread=getThreadFactory().newThread(this);}//这里调用了外部类的runWorker方法publicvoidrun(){runWorker(this);}...//其他几个方法没什么好看的,就是用AQS操作,来获取这个线程的执行权,用了独占锁}
前面虽然啰嗦,但是简单。有了上面的这些基础后,我们终于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源码分析的时候也说了,各种方法都最终依赖于 execute 方法:
publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();//前面说的那个表示“线程池状态”和“线程数”的整数intc=ctl.get();//如果当前线程数少于核心线程数,那么直接添加一个worker来执行任务,//创建一个新的线程,并把当前任务command作为这个线程的第一个任务(firstTask)if(workerCountOf(c)<corePoolSize){//添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了//至于执行的结果,到时候会包装到FutureTask中。//返回false代表线程池不允许提交任务if(addWorker(command,true))return;c=ctl.get();}//到这里说明,要么当前线程数大于等于核心线程数,要么刚刚addWorker失败了//如果线程池处于RUNNING状态,把这个任务添加到任务队列workQueue中if(isRunning(c)&&workQueue.offer(command)){/*这里面说的是,如果任务进入了workQueue,我们是否需要开启新的线程*因为线程数在[0,corePoolSize)是无条件开启新的线程*如果线程数已经大于等于corePoolSize,那么将任务添加到队列中,然后进到这里*/intrecheck=ctl.get();//如果线程池已不处于RUNNING状态,那么移除已经入队的这个任务,并且执行拒绝策略if(!isRunning(recheck)&&remove(command))reject(command);//如果线程池还是RUNNING的,并且线程数为0,那么开启新的线程//到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了elseif(workerCountOf(recheck)==0)addWorker(null,false);}//如果workQueue队列满了,那么进入到这个分支//以maximumPoolSize为界创建新的worker,//如果失败,说明当前线程数已经达到maximumPoolSize,执行拒绝策略elseif(!addWorker(command,false))reject(command);}
对创建线程的错误理解:如果线程数少于 corePoolSize,创建一个线程,如果线程数在 [corePoolSize, maximumPoolSize] 之间那么可以创建线程或复用空闲线程,keepAliveTime 对这个区间的线程有效。
从上面的几个分支,我们就可以看出,上面的这段话是错误的。
上面这些一时半会也不可能全部消化搞定,我们先继续往下吧,到时候再回头看几遍。
这个方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我们看看它是怎么创建新的线程的:
//第一个参数是准备提交给这个线程执行的任务,之前说了,可以为null//第二个参数为true代表使用核心线程数corePoolSize作为创建线程的界限,也就说创建这个线程的时候,//如果线程池中的线程总数已经达到corePoolSize,那么不能响应这次创建线程的请求//如果是false,代表使用最大线程数maximumPoolSize作为界限privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//这个非常不好理解//如果线程池已关闭,并满足以下条件之一,那么不创建新的worker://1\.线程池状态大于SHUTDOWN,其实也就是STOP,TIDYING,或TERMINATED//2\.firstTask!=null//3\.workQueue.isEmpty()//简单分析下://还是状态控制的问题,当线程池处于SHUTDOWN的时候,不允许提交任务,但是已有的任务继续执行//当状态大于SHUTDOWN时,不允许提交任务,且中断正在执行的任务//多说一句:如果线程池处于SHUTDOWN,但是firstTask为null,且workQueue非空,那么是允许创建worker的//这是因为SHUTDOWN的语义:不允许提交新的任务,但是要把已经进入到workQueue的任务执行完,所以在满足条件的基础上,是允许创建新的Worker的if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))returnfalse;for(;;){intwc=workerCountOf(c);if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;//如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了//这里失败的话,说明有其他线程也在尝试往线程池中创建线程if(compareAndIncrementWorkerCount(c))breakretry;//由于有并发,重新再读取一下ctlc=ctl.get();//正常如果是CAS失败的话,进到下一个里层的for循环就可以了//可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池//那么需要回到外层的for循环if(runStateOf(c)!=rs)continueretry;//elseCASfailedduetoworkerCountchange;retryinnerloop}}/**到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,*因为该校验的都校验了,至于以后会发生什么,那是以后的事,至少当前是满足条件的*///worker是否已经启动booleanworkerStarted=false;//是否已将这个worker添加到workers这个HashSet中booleanworkerAdded=false;Workerw=null;try{finalReentrantLockmainLock=this.mainLock;//把firstTask传给worker的构造方法w=newWorker(firstTask);//取worker中的线程对象,之前说了,Worker的构造方法会调用ThreadFactory来创建一个新的线程finalThreadt=w.thread;if(t!=null){//这个是整个线程池的全局锁,持有这个锁才能让下面的操作“顺理成章”,//因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭mainLock.lock();try{intc=ctl.get();intrs=runStateOf(c);//小于SHUTTDOWN那就是RUNNING,这个自不必说,是最正常的情况//如果等于SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务if(rs<SHUTDOWN||(rs==SHUTDOWN&&firstTask==null)){//worker里面的thread可不能是已经启动的if(t.isAlive())thrownewIllegalThreadStateException();//加到workers这个HashSet中workers.add(w);ints=workers.size();//largestPoolSize用于记录workers中的个数的最大值//因为workers是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值if(s>largestPoolSize)largestPoolSize=s;workerAdded=true;}}finally{mainLock.unlock();}//添加成功的话,启动这个线程if(workerAdded){//启动线程t.start();workerStarted=true;}}}finally{//如果线程没有启动,需要做一些清理工作,如前面workCount加了1,将其减掉if(!workerStarted)addWorkerFailed(w);}//返回线程是否启动成功returnworkerStarted;}
简单看下 addWorkFailed 的处理:
//workers中删除掉相应的worker//workCount减1privatevoidaddWorkerFailed(Workerw){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{if(w!=null)workers.remove(w);decrementWorkerCount();//rechecksfortermination,incasetheexistenceofthisworkerwasholdingupterminationtryTerminate();}finally{mainLock.unlock();}}
回过头来,继续往下走。我们知道,worker 中的线程 start 后,其 run 方法会调用 runWorker 方法:
//Worker类的run()方法publicvoidrun(){runWorker(this);}
继续往下看 runWorker 方法:
//此方法由worker线程启动后调用,这里用一个while循环来不断地从等待队列中获取任务并执行//前面说了,worker在初始化的时候,可以指定firstTask,那么第一个任务也就可以不需要从队列中获取finalvoidrunWorker(Workerw){//Threadwt=Thread.currentThread();//该线程的第一个任务(如果有的话)Runnabletask=w.firstTask;w.firstTask=null;w.unlock();//allowinterruptsbooleancompletedAbruptly=true;try{//循环调用getTask获取任务while(task!=null||(task=getTask())!=null){w.lock();//如果线程池状态大于等于STOP,那么意味着该线程也要中断if((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{//这是一个钩子方法,留给需要的子类实现beforeExecute(wt,task);Throwablethrown=null;try{//到这里终于可以执行任务了task.run();}catch(RuntimeExceptionx){thrown=x;throwx;}catch(Errorx){thrown=x;throwx;}catch(Throwablex){//这里不允许抛出Throwable,所以转换为Errorthrown=x;thrownewError(x);}finally{//也是一个钩子方法,将task和异常作为参数,留给需要的子类实现afterExecute(task,thrown);}}finally{//置空task,准备getTask获取下一个任务task=null;//累加完成的任务数w.completedTasks++;//释放掉worker的独占锁w.unlock();}}completedAbruptly=false;}finally{//如果到这里,需要执行线程关闭://1\.说明getTask返回null,也就是说,队列中已经没有任务需要执行了,执行关闭//2\.任务执行过程中发生了异常//第一种情况,已经在代码处理了将workCount减1,这个在getTask方法分析中会说//第二种情况,workCount没有进行处理,所以需要在processWorkerExit中处理//限于篇幅,我不准备分析这个方法了,感兴趣的读者请自行分析源码processWorkerExit(w,completedAbruptly);}}
我们看看 getTask() 是怎么获取任务的,这个方法写得真的很好,每一行都很简单,组合起来却所有的情况都想好了:
//此方法有三种可能://1\.阻塞直到获取到任务返回。我们知道,默认corePoolSize之内的线程是不会被回收的,//它们会一直等待任务//2\.超时退出。keepAliveTime起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭//3\.如果发生了以下条件,此方法必须返回null://-池中有大于maximumPoolSize个workers存在(通过调用setMaximumPoolSize进行设置)//-线程池处于SHUTDOWN,而且workQueue是空的,前面说了,这种不再接受新的任务//-线程池处于STOP,不仅不接受新的线程,连workQueue中的线程也不再执行privateRunnablegetTask(){booleantimedOut=false;//Didthelastpoll()timeout?retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//两种可能//1\.rs==SHUTDOWN&&workQueue.isEmpty()//2\.rs>=STOPif(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){//CAS操作,减少工作线程数decrementWorkerCount();returnnull;}booleantimed;//Areworkerssubjecttoculling?for(;;){intwc=workerCountOf(c);//允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭timed=allowCoreThreadTimeOut||wc>corePoolSize;//这里break,是为了不往下执行后一个if(compareAndDecrementWorkerCount(c))//两个if一起看:如果当前线程数wc>maximumPoolSize,或者超时,都返回null//那这里的问题来了,wc>maximumPoolSize的情况,为什么要返回null?//换句话说,返回null意味着关闭线程。//那是因为有可能开发者调用了setMaximumPoolSize()将线程池的maximumPoolSize调小了,那么多余的Worker就需要被关闭if(wc<=maximumPoolSize&&!(timedOut&&timed))break;if(compareAndDecrementWorkerCount(c))returnnull;c=ctl.get();//Re-readctl//compareAndDecrementWorkerCount(c)失败,线程池中的线程数发生了改变if(runStateOf(c)!=rs)continueretry;//elseCASfailedduetoworkerCountchange;retryinnerloop}//wc<=maximumPoolSize同时没有超时try{//到workQueue中获取任务Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){//如果此worker发生了中断,采取的方案是重试//解释下为什么会发生中断,这个读者要去看setMaximumPoolSize方法。//如果开发者将maximumPoolSize调小了,导致其小于当前的workers数量,//那么意味着超出的部分线程要被关闭。重新进入for循环,自然会有部分线程会返回nulltimedOut=false;}}}
到这里,基本上也说完了整个流程,读者这个时候应该回到 execute(Runnable command) 方法,看看各个分支,我把代码贴过来一下:
publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();//前面说的那个表示“线程池状态”和“线程数”的整数intc=ctl.get();//如果当前线程数少于核心线程数,那么直接添加一个worker来执行任务,//创建一个新的线程,并把当前任务command作为这个线程的第一个任务(firstTask)if(workerCountOf(c)<corePoolSize){//添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了//至于执行的结果,到时候会包装到FutureTask中。//返回false代表线程池不允许提交任务if(addWorker(command,true))return;c=ctl.get();}//到这里说明,要么当前线程数大于等于核心线程数,要么刚刚addWorker失败了//如果线程池处于RUNNING状态,把这个任务添加到任务队列workQueue中if(isRunning(c)&&workQueue.offer(command)){/*这里面说的是,如果任务进入了workQueue,我们是否需要开启新的线程*因为线程数在[0,corePoolSize)是无条件开启新的线程*如果线程数已经大于等于corePoolSize,那么将任务添加到队列中,然后进到这里*/intrecheck=ctl.get();//如果线程池已不处于RUNNING状态,那么移除已经入队的这个任务,并且执行拒绝策略if(!isRunning(recheck)&&remove(command))reject(command);//如果线程池还是RUNNING的,并且线程数为0,那么开启新的线程//到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了elseif(workerCountOf(recheck)==0)addWorker(null,false);}//如果workQueue队列满了,那么进入到这个分支//以maximumPoolSize为界创建新的worker,//如果失败,说明当前线程数已经达到maximumPoolSize,执行拒绝策略elseif(!addWorker(command,false))reject(command);}
上面各个分支中,有两种情况会调用 reject(command) 来处理任务,因为按照正常的流程,线程池此时不能接受这个任务,所以需要执行我们的拒绝策略。接下来,我们说一说 ThreadPoolExecutor 中的拒绝策略。
finalvoidreject(Runnablecommand){//执行拒绝策略handler.rejectedExecution(command,this);}
此处的 handler 我们需要在构造线程池的时候就传入这个参数,它是 RejectedExecutionHandler 的实例。
RejectedExecutionHandler 在 ThreadPoolExecutor 中有四个已经定义好的实现类可供我们直接使用,当然,我们也可以实现自己的策略,不过一般也没有必要。
//只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。publicstaticclassCallerRunsPolicyimplementsRejectedExecutionHandler{publicCallerRunsPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){r.run();}}}//不管怎样,直接抛出RejectedExecutionException异常//这个是默认的策略,如果我们构造线程池的时候不传相应的handler的话,那就会指定使用这个publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){thrownewRejectedExecutionException("Task"+r.toString()+"rejectedfrom"+e.toString());}}//不做任何处理,直接忽略掉这个任务publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler{publicDiscardPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){}}//这个相对霸道一点,如果线程池没有被关闭的话,//把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler{publicDiscardOldestPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){e.getQueue().poll();e.execute(r);}}}
到这里,ThreadPoolExecutor 的源码算是分析结束了。单纯从源码的难易程度来说,ThreadPoolExecutor 的源码还算是比较简单的,只是需要我们静下心来好好看看罢了。
Executors
这节其实也不是分析 Executors 这个类,因为它仅仅是工具类,它的所有方法都是 static 的。
生成一个固定大小的线程池:
publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());}
最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。
过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。
生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:
publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>()));}
生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:
publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<Runnable>());}
核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。
这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。
过程分析:我把 execute 方法的主体黏贴过来,让大家看得明白些。鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中,否则就是和第一个任务一样,进到最后的 else if 分支创建新线程。
intc=ctl.get();//corePoolSize为0,所以不会进到这个if分支if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}//offer如果有空闲线程刚好可以接收此任务,那么返回true,否则返回falseif(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)addWorker(null,false);}elseif(!addWorker(command,false))reject(command);
SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,可以看我的另一篇关于 BlockingQueue 的文章。
总结
我一向不喜欢写总结,因为我把所有需要表达的都写在正文中了,写小篇幅的总结并不能真正将话说清楚,本文的总结部分为准备面试的读者而写,希望能帮到面试者或者没有足够的时间看完全文的读者。
java 线程池有哪些关键属性?
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之间的线程会被回收,当然 corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。
keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作
rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,然后提交此任务这四种策略,默认为抛出异常。
说说线程池中的线程创建时机?
注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了。因为后面的任务直接往队列塞就行了,此时 maximumPoolSize 参数就没有什么意义。
如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 构造出来的线程池有什么差别?
细说太长,往上滑一点点,在 Executors 的小节进行了详尽的描述。
任务执行过程中发生异常怎么处理?
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
什么时候会执行拒绝策略?
workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
怎样深度解读java线程池设计思想及源码实现的详细内容,希望对您有所帮助,信息来源于网络。