CountDownLatch和Atomic原子操作类源码分析
导读:本文共5472.5字符,通常情况下阅读需要18分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 引导语本小节和大家一起来看看 CountDownLatch 和 Atomic 打头的原子操作类,CountDownLatch 的源码非常少,看起来比较简单,但 CountDownLatch 的实际应用却不是很容易;Atomic 原子操作类就比较好理解和应用,接下来我们分别来看一下。1、CountDownLatchCountDownLatch 中文有的叫做计数器,... ...
目录
(为您整理了一些要点),点击可以直达。引导语
本小节和大家一起来看看 CountDownLatch 和 Atomic 打头的原子操作类,CountDownLatch 的源码非常少,看起来比较简单,但 CountDownLatch 的实际应用却不是很容易;Atomic 原子操作类就比较好理解和应用,接下来我们分别来看一下。
1、CountDownLatch
CountDownLatch 中文有的叫做计数器,也有翻译为计数锁,其最大的作用不是为了加锁,而是通过计数达到等待的功能,主要有两种形式的等待:
让一组线程在全部启动完成之后,再一起执行(先启动的线程需要阻塞等待后启动的线程,直到一组线程全部都启动完成后,再一起执行);
主线程等待另外一组线程都执行完成之后,再继续执行。
我们会举一个示例来演示这两种情况,但在这之前,我们先来看看 CountDownLatch 的底层源码实现,这样就会清晰一点,不然一开始就来看示例,估计很难理解。
CountDownLatch 有两个比较重要的 API,分别是 await 和 countDown,管理着线程能否获得锁和锁的释放(也可以称为对 state 的计数增加和减少)。
1.1、await
await 我们可以叫做等待,也可以叫做加锁,有两种不同入参的方法,源码如下:
publicvoidawait()throwsInterruptedException{sync.acquireSharedInterruptibly(1);}//带有超时时间的,最终都会转化成毫秒publicbooleanawait(longtimeout,TimeUnitunit)throwsInterruptedException{returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));}
两个方法底层使用的都是 sync,sync 是一个同步器,是 CountDownLatch 的内部类实现的,如下:
privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{}
可以看出来 Sync 继承了 AbstractQueuedSynchronizer,具备了同步器的通用功能。
无参 await 底层使用的是 acquireSharedInterruptibly 方法,有参的使用的是 tryAcquireSharedNanos 方法,这两个方法都是 AQS 的方法,底层实现很相似,主要分成两步:
1.使用子类的 tryAcquireShared 方法尝试获得锁,如果获取了锁直接返回,获取不到锁走 2;
2.获取不到锁,用 Node 封装一下当前线程,追加到同步队列的尾部,等待在合适的时机去获得锁。
第二步是 AQS 已经实现了,第一步 tryAcquireShared 方法是交给 Sync 实现的,源码如下:
//如果当前同步器的状态是0的话,表示可获得锁protectedinttryAcquireShared(intacquires){return(getState()==0)?1:-1;}
获得锁的代码也很简单,直接根据同步器的 state 字段来进行判断,但还是有两点需要注意一下:
获得锁时,state 的值不会发生变化,像 ReentrantLock 在获得锁时,会把 state + 1,但 CountDownLatch 不会;
CountDownLatch 的 state 并不是 AQS 的默认值 0,而是可以赋值的,是在 CountDownLatch 初始化的时候赋值的,
代码如下:
//初始化,count代表state的初始化值publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count<0");//newSync底层代码是state=count;this.sync=newSync(count);}
这里的初始化的 count 和一般的锁意义不太一样,count 表示我们希望等待的线程数,在两种不同的等待场景中,count 有不同的含义:
让一组线程在全部启动完成之后,再一起执行的等待场景下, count 代表一组线程的个数;
主线程等待另外一组线程都执行完成之后,再继续执行的等待场景下,count 代表一组线程的个数。
所以我们可以把 count 看做我们希望等待的一组线程的个数,可能我们是等待一组线程全部启动完成,可能我们是等待一组线程全部执行完成。
1.2、countDown
countDown 中文翻译为倒计时,每调用一次,都会使 state 减一,底层调用的方法如下:
publicvoidcountDown(){sync.releaseShared(1);}
releaseShared 是 AQS 定义的方法,方法主要分成两步:
1.尝试释放锁(tryReleaseShared),锁释放失败直接返回,释放成功走2
2.释放当前节点的后置等待节点。
第二步 AQS 已经实现了,第一步是 Sync 实现的,我们一起来看下 tryReleaseShared 方法的实现源码:
//对state进行递减,直到state变成0;//state递减为0时,返回true,其余返回falseprotectedbooleantryReleaseShared(intreleases){//自旋保证CAS一定可以成功for(;;){intc=getState();//state已经是0了,直接返回falseif(c==0)returnfalse;//对state进行递减intnextc=c-1;if(compareAndSetState(c,nextc))returnnextc==0;}}
从源码中可以看到,只有到 count 递减到 0 时,countDown 才会返回 true。
1.3、示例
看完 CountDownLatch 两个重要 API 后,我们来实现文章开头说的两个功能:
让一组线程在全部启动完成之后,再一起执行;
主线程等待另外一组线程都执行完成之后,再继续执行。
代码在 CountDownLatchDemo 类中,大家可以调试看看,源码如下:
publicclassCountDownLatchDemo{//线程任务classWorkerimplementsRunnable{//定义计数锁用来实现功能1privatefinalCountDownLatchstartSignal;//定义计数锁用来实现功能2privatefinalCountDownLatchdoneSignal;Worker(CountDownLatchstartSignal,CountDownLatchdoneSignal){this.startSignal=startSignal;this.doneSignal=doneSignal;} //子线程做的事情publicvoidrun(){try{System.out.println(Thread.currentThread().getName()+"begin");//await时有两点需要注意:await时state不会发生变化,2:startSignal的state初始化是1,所以所有子线程都是获取不到锁的,都需要到同步队列中去等待,达到先启动的子线程等待后面启动的子线程的结果startSignal.await();doWork();//countDown每次会使state减一,doneSignal初始化为9,countDown前8次执行都会返回false(releaseShared方法),执行第9次时,state递减为0,会countDown成功,表示所有子线程都执行完了,会释放await在doneSignal上的主线程doneSignal.countDown();System.out.println(Thread.currentThread().getName()+"end");}catch(InterruptedExceptionex){}//return;}voiddoWork()throwsInterruptedException{System.out.println(Thread.currentThread().getName()+"sleep5s…………");Thread.sleep(5000l);}}@Testpublicvoidtest()throwsInterruptedException{//state初始化为1很关键,子线程是不断的await,await时state是不会变化的,并且发现state都是1,所有线程都获取不到锁//造成所有线程都到同步队列中去等待,当主线程执行countDown时,就会一起把等待的线程给释放掉CountDownLatchstartSignal=newCountDownLatch(1);//state初始化成9,表示有9个子线程执行完成之后,会唤醒主线程CountDownLatchdoneSignal=newCountDownLatch(9);for(inti=0;i<9;++i)//createandstartthreads{newThread(newWorker(startSignal,doneSignal)).start();}System.out.println("mainthreadbegin");//这行代码唤醒9个子线程,开始执行(因为startSignal锁的状态是1,所以调用一次countDown方法就可以释放9个等待的子线程)startSignal.countDown();//这行代码使主线程陷入沉睡,等待9个子线程执行完成之后才会继续执行(就是等待子线程执行doneSignal.countDown())doneSignal.await();System.out.println("mainthreadend");}}执行结果:Thread-0beginThread-1beginThread-2beginThread-3beginThread-4beginThread-5beginThread-6beginThread-7beginThread-8beginmainthreadbeginThread-0sleep5s…………Thread-1sleep5s…………Thread-4sleep5s…………Thread-3sleep5s…………Thread-2sleep5s…………Thread-8sleep5s…………Thread-7sleep5s…………Thread-6sleep5s…………Thread-5sleep5s…………Thread-0endThread-1endThread-4endThread-3endThread-2endThread-8endThread-7endThread-6endThread-5endmainthreadend
从执行结果中,可以看出已经实现了以上两个功能,实现比较绕,大家可以根据注释,debug 看一看。
2、Atomic 原子操作类
Atomic 打头的原子操作类有很多,涉及到 Java 常用的数字类型的,基本都有相应的 Atomic 原子操作类,如下图所示:
Atomic 打头的原子操作类,在高并发场景下,都是线程安全的,我们可以放心使用。
我们以 AtomicInteger 为例子,来看下主要的底层实现:
privatevolatileintvalue;//初始化publicAtomicInteger(intinitialValue){value=initialValue;}//得到当前值publicfinalintget(){returnvalue;}//自增1,并返回自增之前的值publicfinalintgetAndIncrement(){returnunsafe.getAndAddInt(this,valueOffset,1);}//自减1,并返回自增之前的值publicfinalintgetAndDecrement(){returnunsafe.getAndAddInt(this,valueOffset,-1);}
从源码中,我们可以看到,线程安全的操作方法,底层都是使用 unsafe 方法实现,以上几个 unsafe 方法不是使用 Java 实现的,都是线程安全的。
AtomicInteger 是对 int 类型的值进行自增自减,那如果 Atomic 的对象是个自定义类怎么办呢,Java 也提供了自定义对象的原子操作类,叫做 AtomicReference。AtomicReference 类可操作的对象是个泛型,所以支持自定义类,其底层是没有自增方法的,操作的方法可以作为函数入参传递,源码如下:
//对x执行accumulatorFunction操作//accumulatorFunction是个函数,可以自定义想做的事情//返回老值publicfinalVgetAndAccumulate(Vx,BinaryOperator<V>accumulatorFunction){//prev是老值,next是新值Vprev,next;//自旋+CAS保证一定可以替换老值do{prev=get();//执行自定义操作next=accumulatorFunction.apply(prev,x);}while(!compareAndSet(prev,next));returnprev;}
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
CountDownLatch和Atomic原子操作类源码分析的详细内容,希望对您有所帮助,信息来源于网络。