Java基于quasar如何实现协程池
导读:本文共2857字符,通常情况下阅读需要10分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)一个线程可以多个协程,一个进程也可以单独拥有多个协程。线程进程都是同步机... ...
目录
(为您整理了一些要点),点击可以直达。业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。
线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。
废话不多说,直接上代码:
导入包:
<dependency><groupId>co.paralleluniverse</groupId><artifactId>quasar-core</artifactId><version>0.7.9</version><classifier>jdk8</classifier></dependency>
WorkTools工具类:
packagecom.example.ai;importco.paralleluniverse.fibers.Fiber;importco.paralleluniverse.fibers.SuspendExecution;importco.paralleluniverse.strands.SuspendableRunnable;importjava.util.concurrent.ArrayBlockingQueue;publicclassWorkTools{//协程池中默认协程的个数为5privatestaticintWORK_NUM=5;//队列默认任务为100privatestaticintTASK_COUNT=100;//工做协程数组privateFiber[]workThreads;//等待队列privatefinalArrayBlockingQueue<SuspendableRunnable>taskQueue;//用户在构造这个协程池时,但愿启动的协程数privatefinalintworkerNum;//构造方法:建立具备默认协程个数的协程池publicWorkTools(){this(WORK_NUM,TASK_COUNT);}//建立协程池,workNum为协程池中工做协程的个数publicWorkTools(intworkerNum,inttaskCount){if(workerNum<=0){workerNum=WORK_NUM;}if(taskCount<=0){taskCount=TASK_COUNT;}this.workerNum=workerNum;taskQueue=newArrayBlockingQueue(taskCount);workThreads=newFiber[workerNum];for(inti=0;i<workerNum;i++){intfinalI=i;workThreads[i]=newFiber<>(newSuspendableRunnable(){@Overridepublicvoidrun()throwsSuspendExecution,InterruptedException{SuspendableRunnablerunnable=null;while(true){try{//取任务,没有则阻塞。runnable=taskQueue.take();}catch(Exceptione){System.out.println(e.getMessage());}//存在任务则运行。if(runnable!=null){runnable.run();}runnable=null;}}});//new一个工做协程workThreads[i].start();//启动工做协程}Runtime.getRuntime().availableProcessors();}//执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定publicvoidexecute(SuspendableRunnabletask){try{taskQueue.put(task);//put:阻塞接口的插入}catch(Exceptione){//TODO:handleexceptionSystem.out.println("阻塞");}}//销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁publicvoiddestory(){//工做协程中止工做,且置为nullSystem.out.println("readyclosethread...");for(inti=0;i<workerNum;i++){workThreads[i]=null;//helpgc}taskQueue.clear();//清空等待队列}//覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数@OverridepublicStringtoString(){return"WorkThreadnumber:"+workerNum+"==分割线==waittasknumber:"+taskQueue.size();}}
测试代码:
packagecom.example.ai;importco.paralleluniverse.strands.SuspendableRunnable;importlombok.SneakyThrows;importorg.springframework.boot.autoconfigure.SpringBootApplication;importjava.util.concurrent.CountDownLatch;@SpringBootApplicationpublicclassAiApplication{@SneakyThrowspublicstaticvoidmain(String[]args){//等待协程任务完毕后再结束主线程CountDownLatchcdl=newCountDownLatch(50);//开启5个协程,50个任务列队。WorkToolsmyThreadPool=newWorkTools(5,50);for(inti=0;i<50;i++){intfinalI=i;myThreadPool.execute(newSuspendableRunnable(){@Overridepublicvoidrun(){System.out.println(finalI);try{//延迟1秒Thread.sleep(1000);cdl.countDown();}catch(InterruptedExceptione){System.out.println("阻塞中");}}});}//阻塞cdl.await();}}
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
Java基于quasar如何实现协程池的详细内容,希望对您有所帮助,信息来源于网络。