java并发ThreadPoolExecutor如何使用(java,threadpoolexecutor,开发技术)

时间:2024-05-08 20:06:15 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

一. 线程池的简单原理

当一个任务提交到线程池ThreadPoolExecutor时,该任务的执行如下图所示。

java并发ThreadPoolExecutor如何使用

  • 如果当前运行的线程数小于corePoolSzie(核心线程数),则创建新线程来执行任务(需要获取全局锁);

  • 如果当前运行的线程数等于或大于corePoolSzie,则将任务加入BlockingQueue(任务阻塞队列);

  • 如果BlockingQueue已满,则创建新的线程来执行任务(需要获取全局锁);

  • 如果创建新线程会使当前线程数大于maximumPoolSize(最大线程数),则拒绝任务并调用RejectedExecutionHandlerrejectedExecution() 方法。

由于ThreadPoolExecutor存储工作线程使用的集合是HashSet,因此执行上述步骤1和步骤3时需要获取全局锁来保证线程安全,而获取全局锁会导致线程池性能瓶颈,因此通常情况下,线程池完成预热后(当前线程数大于等于corePoolSize),线程池的execute() 方法都是执行步骤2。

二. 线程池的创建

通过ThreadPoolExecutor能够创建一个线程池,ThreadPoolExecutor的构造函数签名如下。

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,RejectedExecutionHandlerhandler)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler)

通过ThreadPoolExecutor创建线程池时,需要指定线程池的核心线程数最大线程数线程保活时间线程保活时间单位任务阻塞队列,并按需指定线程工厂饱和拒绝策略,如果不指定线程工厂饱和拒绝策略,则ThreadPoolExecutor会使用默认的线程工厂饱和拒绝策略。下面分别介绍这些参数的含义。

参数含义corePoolSize核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。maximumPoolSize最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的任务会加入任务阻塞队列,但是如果任务阻塞队列已满且线程数小于maximumPoolSize,此时会继续创建新的线程来执行任务。该参数规定了线程池允许创建的最大线程数keepAliveTime线程保活时间。当线程池的线程数大于核心线程数时,多余的空闲线程会最大存活keepAliveTime的时间,如果超过这个时间且空闲线程还没有获取到任务来执行,则该空闲线程会被回收掉。unit线程保活时间单位。通过TimeUnit指定线程保活时间的时间单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么时间单位,ThreadPoolExecutor统一会将其转换为NANOSECONDSworkQueue任务阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的任务会添加到workQueue中,所有线程执行完上一个任务后,会循环从workQueue中获取任务来执行。threadFactory创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。handler饱和拒绝策略。如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。

三. 线程池执行任务

1. 执行无返回值任务

通过ThreadPoolExecutorexecute() 方法,能执行Runnable任务,示例如下。

publicclassThreadPoolExecutorTest{@TestpublicvoidThreadPoolExecutor执行简单无返回值任务()throwsException{//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue&lt;&gt;(300));//创建两个任务RunnablefirstRunnable=newRunnable(){@Overridepublicvoidrun(){System.out.println("第一个任务执行");}};RunnablesecondRunnable=newRunnable(){@Overridepublicvoidrun(){System.out.println("第二个任务执行");}};//让线程池执行任务threadPoolExecutor.execute(firstRunnable);threadPoolExecutor.execute(secondRunnable);//让主线程睡眠1秒,等待线程池中的任务被执行完毕Thread.sleep(1000);}}

运行测试程序,结果如下。

java并发ThreadPoolExecutor如何使用

2. 执行有返回值任务

通过ThreadPoolExecutorsubmit() 方法,能够执行Callable任务,通过submit() 方法返回的RunnableFuture能够拿到异步执行的结果。示例如下。

publicclassThreadPoolExecutorTest{@TestpublicvoidThreadPoolExecutor执行简单有返回值任务()throwsException{//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue&lt;&gt;(300));//创建两个任务,任务执行完有返回值Callable&lt;String&gt;firstCallable=newCallable&lt;String&gt;(){@OverridepublicStringcall()throwsException{return"第一个任务返回值";}};Callable&lt;String&gt;secondCallable=newCallable&lt;String&gt;(){@OverridepublicStringcall()throwsException{return"第二个任务返回值";}};//让线程池执行任务Future&lt;String&gt;firstFuture=threadPoolExecutor.submit(firstCallable);Future&lt;String&gt;secondFuture=threadPoolExecutor.submit(secondCallable);//获取执行结果,拿不到结果会阻塞在get()方法上System.out.println(firstFuture.get());System.out.println(secondFuture.get());}}

运行测试程序,结果如下。

java并发ThreadPoolExecutor如何使用

3. 执行有返回值任务时抛出错误

如果ThreadPoolExecutor在执行Callable任务时,在Callable任务中抛出了异常并且没有捕获,那么这个异常是可以通过Futureget() 方法感知到的。示例如下。

publicclassThreadPoolExecutorTest{@TestpublicvoidThreadPoolExecutor执行简单有返回值任务时抛出错误(){//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue<>(300));//创建一个任务,任务有返回值,但是执行过程中抛出异常Callable<String>exceptionCallable=newCallable<String>(){@OverridepublicStringcall()throwsException{thrownewRuntimeException("发生了异常");}};//让线程池执行任务Future<String>exceptionFuture=threadPoolExecutor.submit(exceptionCallable);try{System.out.println(exceptionFuture.get());}catch(Exceptione){System.out.println(e.getMessage());}}}

运行测试程序,结果如下。

java并发ThreadPoolExecutor如何使用

4. ThreadPoolExecutor通过submit方式执行Runnable

ThreadPoolExecutor可以通过submit() 方法来运行Runnable任务,并且还可以异步获取执行结果。示例如下。

publicclassThreadPoolExecutorTest{@TestpublicvoidThreadPoolExecutor通过submit的方式来提交并执行Runnable()throwsException{//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue&lt;&gt;(300));//创建结果对象MyResultmyResult=newMyResult();//创建Runnable对象Runnablerunnable=newRunnable(){@Overridepublicvoidrun(){myResult.setResult("任务执行了");}};//通过ThreadPoolExecutor的submit()方法提交RunnableFuture&lt;MyResult&gt;resultFuture=threadPoolExecutor.submit(runnable,myResult);//获取执行结果MyResultfinalResult=resultFuture.get();//myResult和finalResult的地址实际相同Assert.assertEquals(myResult,finalResult);//打印执行结果System.out.println(resultFuture.get().getResult());}privatestaticclassMyResult{Stringresult;publicMyResult(){}publicMyResult(Stringresult){this.result=result;}publicStringgetResult(){returnresult;}publicvoidsetResult(Stringresult){this.result=result;}}}

运行测试程序,结果如下。

java并发ThreadPoolExecutor如何使用

实际上ThreadPoolExecutorsubmit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutorexecute() 方法来执行FutureTask

四. 关闭线程池

关闭线程池可以通过ThreadPoolExecutorshutdown() 方法,但是shutdown() 方法不会去中断正在执行任务的线程,所以如果线程池里有Worker正在执行一个永远不会结束的任务,那么shutdown() 方法是无法关闭线程池的。示例如下。

publicclassThreadPoolExecutorTest{@Testpublicvoid通过shutdown关闭线程池(){//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue<>(300));//创建Runnable对象Runnablerunnable=newRunnable(){@Overridepublicvoidrun(){while(!Thread.currentThread().isInterrupted()){LockSupport.parkNanos(1000*1000*1000);}System.out.println(Thread.currentThread().getName()+"被中断");}};//让线程池执行任务threadPoolExecutor.execute(runnable);threadPoolExecutor.execute(runnable);//调用shutdown方法关闭线程池threadPoolExecutor.shutdown();//等待3秒观察现象LockSupport.parkNanos(1000*1000*1000*3L);}}

运行测试程序,会发现在主线程中等待3秒后,也没有得到预期的打印结果。如果上述测试程序中使用shutdownNow,则是可以得到预期打印结果的,示例如下。

publicclassThreadPoolExecutorTest{@Testpublicvoid通过shutdownNow关闭线程池(){//创建一个线程池ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,60,TimeUnit.SECONDS,newArrayBlockingQueue<>(300));//创建Runnable对象Runnablerunnable=newRunnable(){@Overridepublicvoidrun(){while(!Thread.currentThread().isInterrupted()){LockSupport.parkNanos(1000*1000*1000);}System.out.println(Thread.currentThread().getName()+"被中断");}};//让线程池执行任务threadPoolExecutor.execute(runnable);threadPoolExecutor.execute(runnable);//调用shutdown方法关闭线程池threadPoolExecutor.shutdownNow();//等待3秒观察现象LockSupport.parkNanos(1000*1000*1000*3L);}}

运行测试程序,打印如下。

java并发ThreadPoolExecutor如何使用

因为测试程序中的任务是响应中断的,而ThreadPoolExecutorshutdownNow() 方法会中断所有Worker,所以执行shutdownNow() 方法后,正在运行的任务会响应中断并结束运行,最终线程池关闭。

假如线程池中运行着一个永远不会结束的任务,且这个任务不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都是无法关闭线程池的。

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:java并发ThreadPoolExecutor如何使用的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:webpack模块化的原理是什么下一篇:

4 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18