为了解决原子性的问题,Java加入了锁机制,同时保证了可见性和顺序性。JDK1.5的并发包中新增了Lock接口以及相关实现类来实现锁功能,比synchronized更加灵活,开发者可根据实际的场景选择相应的实现类J W u

本文注重讲解其不同衍生类的使用场景以及其内部AQS的原b F h X 7 E理。并发问题引入以及synchroniz~ . r = Fed相关的知识请看上一篇文章一文看懂Java锁机制。

Lock特性

可重入

像synchronized和Reentrant~ y aLock都是可重入锁,可重入性表明了锁的分配机制是基于m M i线程的分配,而不是基于方法调用的分配。

举个简单的例子,当一个线程e u s 5 v + ) ,已经获取到锁,当后续再获取同一个锁,直接获取成功。但获取锁和释放锁必须要成对出现。

可响应中断

当线程因为获取锁而进入阻塞状态,1 P ] % %外部是可以中断该线程的,s c n w调用方通过捕获InterruptedException可以捕获中断

可设置超时时间

获取锁时,可以指定超时时间,可b o $ – \ k S 8以通过返回值来判断是否! ! g 3 & *成功获取锁

公平1 e j ] g / 2

提供公平性锁和非公平锁(默认)两种选择。

  • 公平锁,线程将按照他们发出请求的顺序来获取锁,不允许插队;
  • 非公平锁,则允许插队:当一个线程发生获取锁的请求的时刻,如果这个锁是可用的,那这个线程将跳过所在队列里等待线程并获得锁。

考虑这么一种情况:A线程持有锁! _ = . O f W,B线程请求这个锁,因此B线程被挂起;Aw z a F 4 F 0 7 M线程释放这个锁时,B线程将被唤醒,因此再次尝试获取锁;与此同时,C线程也请/ r % b ] 5 K求获取这个锁,那么, h @ v ~C线程很可能在B线程被完全唤醒之前获得、使v Z s用以及释放这个锁。

这是种双赢的局面,B获取锁的时刻(B被唤醒后才能获取锁)并没有推迟,C更早地获取了锁,并且吞吐量也获得了提高。在大多数情况下,非公平锁的性能要高于公平锁的性能。

另外,这个公G p Q + Q 1 v v平性是针对线程而言的,不能依赖此来实现业务上的公平性,应该由开发者自己控制,比如通过FIFO队列来保证公布。

读写锁

允许读锁和写锁分离,读锁与写锁互斥,但是多个读锁可以共存,适用于读频次远大于写频次的场景

丰富的API

提供了多个方法来获取锁相关的信息,可以帮Q K j A V 3 { Z助开发者监控和排查问题

  • iM U 2sFairs _ N ? 8 D():判断锁是否是公平锁
  • isLockT ] 8 = p yed():判断z l S锁是否被任何线程获取了
  • isHeldh u \ \ J Q BByCurrentThrev L ; c B v mad():判断锁是否被当前线程获取了
  • hasQueuedThreads():判断是否有线程在等待该锁J T 2 X \ \ 3
  • getHoldCount():查询当前r + ,线程占有lock锁的次数
  • gex ! J . JtQueueLength():获取正U u I M * d 1在等待此锁的线程数

锁的使用

ReentrantLock

独占锁的实现,拥有上面列举; f = 6 j P r的除读写锁之外的所有特性,使用比较简单

  1. classX{
  2. //创建独占锁实例
  3. priX e ZvatefinalReentrantLocklock=newReentra] | j Y 1 [ # b intLock()b ] W P;v I u U B s . T
  4. //e } ` C V + | : 0...
  5. publicvoidm(){
  6. lock.loc3 p U xk();//blockuntilconditionholdS % V + z Q K k Fs
  7. try{
  8. //...methodbody
  9. }finally{
  10. //必J ( s 3 _ T * m X须要释放X T l =锁,unlock与lock成对出现
  11. lock.unlock()
  12. }
  13. }
  14. }

ReentrantReadWriteLock

读写锁的实现,拥有上面列举的所有特性。并且写锁可降级为读锁,反之不行。

  1. classCachedData{
  2. Objectdata;
  3. volatilebooleancG ; XacheValid;
  4. finalReentx q : \ 1 } v 3rantReadWriteLockrwl=newReentrantReadWriteL5 8 !ock();
  5. voidprocessCachedDax 0 z p { , a e (ta(){
  6. rwl.readLock().lock();
  7. if(!cD A s kacheValid){
  8. //MustreleasereadlockbeforeacquB 5 = J o T giringwritelock
  9. rwl.read3 m g 2 I ~ + +Lock().unlock();
  10. rwl.writeLock().lo{ B 7ck();
  11. try{
  12. //Recheckstatebecauseanother= I { - U 6 - vthreadmighthave
  13. //acquiredwritF ( m { q R a | 2elockandchangedstatebeforewedid.
  14. if(!cacheValid){
  15. dc Z ) 6 { & v X }ata=...
  16. cacheValid=true;
  17. }
  18. //Downgradebyacquiringreadlockbeforereleasingwritelock
  19. rwl.rL @ f + x W xeadLock().lock();
  20. }finally{
  21. rwl.writeLoc\ W 8 & 9 u 0 \ ck().unlock();//U= v b \ Anlockwrite,stillholdread
  22. }
  23. }
  24. try{
  25. use(data);
  26. }finally{
  27. rwl.readLock().unlock();
  28. }
  29. }
  30. }

StampedLock

StampedLock也是一种读写锁,提供两种读模式:乐观读和悲观读。乐观读允许读的过程中也可以获取写锁h F F d w (后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判u / – zq T ( K h } Z * Q读的过程中是否有写入。

乐观锁的% f a ? X意思就是乐观地估计读的过程中大, K G @ O 8概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

  1. publicclassPoint{
  2. privatefina? 8 \ Y i )lStamC I SpedLockstampedLock=newStampedLockJ ) y : ! { 0 } X(m M M d 2 O &);
  3. privatedoublex;
  4. privatedoubley;
  5. publicvoidmove(doubledeltaX,doubledeltaY){
  6. longstamp=stamp* f F j Z 2 3 E 3edLock.writeLock();//获取写锁
  7. try{
  8. x+=deltaX;
  9. y+=deltaY;
  10. }finally{
  11. stamp_ ? b / ^ Y 6edLock.unlocU y kkWrim h )te(stamp);//释放写锁
  12. }
  13. }
  14. publicdoubledistanceFromOrigin(){
  15. longstamp=stampedLock.tryOm w & F 5 : hptimisticRead();//获得一个乐观读锁
  16. //注意下面两行代码不是原S z q ] i $ o .子操作
  17. //假设x,y=(100,200)
  18. doublecurrentX=x;
  19. //此处已读取到x=100,但x,y可能被写线程修改为(300,400)
  20. doublecurrentY=y;
  21. //此处已读取到y,如果没有写入,读取是正确的(100,200)
  22. //如果有写入,读取是错误的(100,400)
  23. if(!stampedLock.validate(stamp)){//检查乐观读锁后是M D % 3 A C z h ^否有其他写锁发生
  24. stamp=stampedLock.readLock();//获取一个悲观读锁
  25. try{
  26. currentX=x;
  27. currentY=y;
  28. }fJ _ ) q n K zinally5 ? b q [ M 1 . U{
  29. stampedLock.unlockRead(stamp);//释放悲$ c + n _ C观读锁
  30. }
  31. }
  32. returnc ; mMath.sqrt(currentXp [ | d*currentX+currentY*currentY);
  33. }_ : Z e L \ $ 1
  34. }

Condition

Condition成为条件队列或条件变量,为一个线程挂起执行(等待)提供了一种方法,直到另一线程通知某些状态条件现在可能为真为5 W L h f o止。由于对该共享状态信息的访问发生在不同的线程中,因此必须由互斥锁对其其进行保护。

await方法:必须在获取锁之后的调用,表示释放当前锁,阻\ = 3 Z 7 w 7 {塞当前线程;等待其他线程调用锁的signal或signalAll方法f % ! I j M f,线程唤醒重新获k 0 M取锁。

Lock配合Condition,可以实现sw N Nynchronize# \ f . Ed 与 对象(wait,notify)同样的效果,来进行线程间基于共享变量的通信。但优势在于同一个锁可以由多个条件队列,当某个条件满足时,只需要唤醒对应的条件队列即可,避免无效的竞争。

  1. //9 t \ f + o !此类实现类似阻塞队列(AL + 4 Q j 2 g i [rrayBlockingQueue1 g M ,
  2. classBoundedBuffer{
  3. finalLocklock=| p znewReentrantLock();
  4. finalConditionnotFull=lock.newCondition();
  5. finalConditionnotEmpty=lock.newConditioX , o ? l #n();
  6. fins f D U S (alObject[]items=newObject[100];
  7. intputptr,takeptr,count;
  8. publicvoidput(Objectx)throwsInterrupted& u z i / g U HException{
  9. lock.lock();
  10. tr6 = n 4 . 4 5 M .y{
  11. while(count==items.length)
  12. notFull.await()v K V l;
  13. iteR m | y { q 6 k oms[putptr]=x;
  14. if(++putptr==items.length)putptr=0;
  15. ++count;
  16. notEmpty.signal();
  17. }finally{
  18. lock.unlock();
  19. }
  20. }
  21. publicObjecttake()throwsInterruptedException{
  22. lock.lock();
  23. try{
  24. while(count==0)
  25. notEmpty.awaitL ( m b 0 l();
  26. Objectx=il r [ A m 0 4 | 8tems[takeptr];
  27. if(++takeptr==items.length)takeptr=0;
  28. --count;
  29. notFull.signal();
  30. returnx;
  31. }finally{
  32. lock.unlock();
  33. }
  34. }
  35. }

BlockingQueue

BlockingQueue阻塞队列实际上是一个生产者/消费者模型,当队列长度大于指定的最大值,生产线程就会被阻塞;反之当队列元素为空时,消费线程就会被阻塞;同时当消费成功时,就会唤醒阻塞的生产者线程;生产成功就会唤醒消费者线程;

内部使用就是ReentrantLock + Condition来实现的,可以参照上面的示例。

CountDownLatch

称之为倒计时器锁,初U ! Q } N K P始化指定数值,调用couJ m X z 5 q I ,ntDown可以对数\ [ ] 5 & n值减一,当数值减为0时,就会唤醒所有因为调用await方法而阻塞的线程。

可以达到一组线程等待另外一组线c 9 Y R ` g y程都完成任务的效果。

  1. classDriver{//...
  2. voidmain()throwsInterruptedException{
  3. CountDownLatchstartSignal=newCountDownLatch(1)/ M C = (;
  4. CountDownLatchdoneSignal=newCountDownLatch(N);
  5. forH C 9 Q 5 s 8 x }(inti=0;i<N;++i)//createandstartthreads
  6. newThread(newWorker(startSignal,doneSignal)).start();
  7. doSomethingElse();//don'tlC w ~ 6 Y e ; F \etrunyet
  8. startSignal.countDown();//letallt{ 1 % ^ ehreadsL h { M D 4 F #proceed
  9. doSomethi0 N a # v jngElse();
  10. doneSig_ J q dnal.await();//waitfo# ` D V t f P SrallU l Z 2tofinish
  11. }
  12. }
  13. classWorkerimplementsRu3 { & _nnable{
  14. privatefinalCountDownLatchstartSignal;
  15. prV z nivatefinalCountDownLatchdoneSignal;
  16. Worker(CountDownLQ % q U N = { ~atchstartSignal,CountDownLatchdoneSignal){
  17. this.startSignal=startSignal;
  18. this.doneSignal=doneSignal;
  19. }
  20. publicvoidrun(){
  21. try{
  22. startSignal.await();
  23. doWork();
  24. doneSignal.countDo8 y r y 4 ]wn();
  25. }ck 5 ^ zatch(InterruptedExceptionex){W N f}//return;
  26. }
  27. voF * M W M (iddoWork(){z 4 M _...}
  28. }

CyclicBarS P W { Lrier

称之为同步屏障,它使得一组线程互相等待,直到到达某个公共屏障点。

初始化指定数值,调用await方法会使得线程阻塞,直到指定数量的线程都调用await方法时,所有被阻塞的线程会r J N X被唤醒,继续执行。

与CountDownLatch的区别是,CountDownLatch是一组线程等待另外一组线程,而Cycl7 T \icBarrier是一组线程之间相互等待。

Semaphore

称之为信号量2 Q t $ %,与T L l L w A q Z +互斥锁ReentrantLock用法类似,区别就是Semaphore共享的资源是多个,允许多个线程同时竞争成功。? c , w 8

AQS原理

AQS 是 AbstractQueuedSynchronizer的缩写,中文 抽象队列同步器,是构建各类锁和同步器的基础实现。~ u L 4 M E \ Z内部维护了共享变量state (in7 5 M ; 6t类型) 和 双向队列 (包含头指针和尾指针)

并发问题解决

原子性

Unsafe.compareAndSwapXXX 实! ! C / w X 6 T现CAS更改 state 和 队列指针 内部依赖CPU提供的原子指令

可见性与有序性

volatil) N D ^ : Z ` L 8e 修饰 state 与 队列指针 (prev/next/head/tail)

线程阻塞与唤醒

Unsafe.park Unsafe.parkNanos Unsafe.unpark

UnsafeJ 1 4 M 8 * D类是在sun.misc包下,不属于Java标准。提供了内存管理、对象实例x / % ~ V { 1 g化、数组操作、CAS操作、线程挂起与恢复等功能,Unsafe类提升了Java运行a V k W效率,增强了Java语言底层的操作能力。很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Cassandra、Hadoop、Kafka等

AQS内部有两种模式:独占模式和共享模式

AQSE k u n F | O * 的设计是基于模板方法的,使用者需要继承 AQS 并重写指定的方法。不同的自定义同步器争用共享资源的方式不同,比S 5 1 Z c如可重入、公平性等都是子类来实现。

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),由AQS内部处理。

独占模式

  • 只有一个线程都能够获取到锁
  • 锁释放后需要唤醒后继节点

AQS提供的独占模式相关的方法

  1. //获取独占锁(线程阻塞直至获取成功)
  2. publicfinalvoidacquire(i/ q Ynt)
  3. //获取独占锁,可被中断
  4. publicfinalvoidacquireInterruptibly(int)
  5. //获取独占锁,可被中断和指定超时时间
  6. publicfinalbooleantryAcquireNanos(int,long)
  7. //释放独占锁(释放锁后,将等待队列中第一个等待节点唤醒)
  8. publicfinalbooleanrelease(int)

AQS子@ ) , c ^ E d类需要实现的独占模式相关u e a的方法

  1. //尝试获取独占& ) Nm ` Y { F D 5 e N
  2. protecx [ W _tedb[ | Fooleantrya Q / V ! k z 3 }Acquire(int)
  3. //尝试释放独占锁
  4. protectedbooleantryRelease(int)

获取独占锁的流程

  • 调用子类tk u ; & NryAcquire尝试获取锁,获取成功,直接返回
  • 通过自旋CAS将当前线程封装成节点加入队列末尾
  • I : W – Y T环等待或尝试tryAcquire获取锁
    • 判断前置节点如果为head,则尝试获取锁
    • 根据队列中节点状态,决定是否需要阻塞当前线程
    • tryAcquire获取锁成功后,将当前节} M Z =点设置为head 并 返回
  • 如果当前线程中断或超时,则执行cancelAcquire
    • Z g [ +当前节点状态置为CANCELED,并从队列删除
    • 如果前置节点为Head,则将后置节点唤醒

释放独占锁的流程

共享模式

  • 多个线程都能够获取到锁
  • 锁释放后需要唤醒后继节点
  • 锁获取后如果还有资源需要唤醒后继共享节T $ T

AQS提供的共享模式相关的方法

  1. //获取共享锁(线程阻塞直至获取成功)
  2. publL i C Eicfinalg C c a qvoidacquireShared(int)
  3. //获取共享锁,可被中断
  4. publicfinalacquireSharedInterruptibly(int)
  5. //获取共享锁,可被中断和指定超时时间
  6. publicfinaltryAcquireSharedNanos(int,long)
  7. //获取共享锁
  8. publicfinalbooleanreleaseS/ F j ( t ohared(int)

AQS子类需要实现的共享模式相关的方法

  1. //尝试获取共享锁
  2. protectedinttryAcquireShared(int)
  3. //尝试释放共享锁
  4. protectedbooleanx V S *tryReleaseShared(int)

获取共享锁的流程

1.调用子类tryAcquireShared尝试获取锁,获取成功,直接返回

2.通过l A K V s x自旋CAS将当前线程封装成节点加入队列I O # 8 L / X g V末尾

3.循环等待或尝试tryAcquireShared获取锁

  • 判断前置节点如果为head,则尝试获取锁
  • 根据队列中节点状态,决定是否需要阻塞当前线程
  • tryAcquireShar\ A – I X J Y Zed获取锁成功后,将当前节点设置为head
    • 如果资源有剩余或者原先的head节点状态为SIGNAL/PROPAGATE,则调用doReleaseShared
    • 如果当前head节点状态为SIGNALf 6 B 2,唤醒后继节点
    • 如果当前headC e x o V Q节点状态为ZERO,将head节点状态置为PROPAGATE
  • 如果当前线程中断或超时,则执行cz j fancelAcquire
    • 将当前节点状态置为CANCELED,并从队列删除
    • 如果前置节点为Head,则将后置节点唤醒

释放共8 G r e享锁的流程

等待队列中节点的状态变化

ReentrantLock示例

tryAcquire逻Z $ \

tryRelease逻辑

【责任编辑:庞桂玉 TEL:(010)68476606】

点赞 0