Loading...
墨滴

air丶

2021/07/08  阅读:109  主题:默认主题

Lock锁原理分析

思考

  • ReentrantLock与Synchronized的区别?面对增加属性如:公平锁是如何实现的呢?
  • 读写锁是共享-互斥锁,读锁是如何多线程共享,写锁是如何做到互斥?
  • wait/nofity会导致死锁吗,使用它们先唤醒再挂起而导致死锁,park为何不会呢?

AQS

  • 如果需要了解lock锁,绕不开AQS(abstract Queue Synchronized 抽象队列同步器)其定义了一套多线程访问共享资源的同步器框架
  • AQS维护了维护了一个 volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
Node
  • 队列中节点由内部类Node位组成的队列“CLH”(三个人名:Craig, Landin 和 Hagersten),CLH队列本质上就是一个双向链表Node就是该链表的节点,数据如下
  1. volatile int waitStatus :当前节点Node的等待状态标志位,标记该节点当前情况下处于何种状态

    waitStatus状态 含义
    默认值= 0 默认初始值
    CANCELLED = 1 节点从同步队列中取消,该状态不会在改变了
    SIGNAL = -1 标记后继节点的线程处于等待状态,当前节点释放同步状态则会通知后继节点,
    使得后继节点线程能够运行
    CONDITION = -2 与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,
    当其他线程调用了Condition的signal()方法后,
    CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁
    PROPAGATE = -3 在共享模式中,该状态标识结点的线程处于可运行状态
  • AQS源码中大量运用waitStatus判断节点状态:

    状态 判断结果 说明
    waitStatus=0 初始化状态 该节点尚未被初始化完成
    waitStatus>0 取消状态 说明该线程中断或者等待超时,需要移除该线程
    waitStatus<0 有效状态 该线程处于可以被唤醒的状态
  1. nextWaiter:保存条件队列

    • AQS中阻塞队列采用的是用双向链表保存,用prve和next相互链接。而AQS中条件队列是使用单向列表保存的,用 nextWaiter来连接。阻塞队列和条件队列并不是使用的相同的数据结构。

      // 共享模式
      static final Node SHARED = new Node();
      // 独占模式
      static final Node EXCLUSIVE = null;
      // 其他模式
      // 其他非空值:条件等待节点(调用Condition的await方法的时候)
    • nextWaiter实际上标记的就是在该节点唤醒后依据该节点的状态判断是否依据条件唤醒下一个节点

      nextWaiter状态标志 说明
      SHARED(共享模式) 直接唤醒下一个节点
      EXCLUSIVE(独占模式) 等待当前线程执行完成后再唤醒
      其他非空值 依据条件决定怎么唤醒下一个线程。类似semaphore中控制几个线程通过

ReentrantLock

  • RentrantLock比较熟悉,这里分成 加锁,入队,阻塞,唤醒,出队,解锁六个步骤来分析
加锁
  1. 使用公平锁及非公平锁
public ReentrantLock(boolean fair) {
    this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
}
  1. 使用lock 调用 AQS中的acquire(1)
public final void acquire(int arg) {
    //AQS实现类
    if (!this.tryAcquire(arg) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }

}

//tryAcquire(1) 中分成公平锁和非公平锁实现
final boolean nonfairTryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    //获取当前state状态
    int c = this.getState();
    if (c == 0) { //当前状态为0表示可加锁用
        //如果state  CAS操作成功,将state替换为1并标记当前线程到AQS队列中正在运行的
        if (this.compareAndSetState(0, acquires)) {  
        /**
        * 公平锁与非公平锁唯一的区别即为先判断 !this.hasQueuedPredecessors() 阻塞队列中是否存在数据
        */
        //if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, acquires)) {
            this.setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == this.getExclusiveOwnerThread()) {
    //可重入锁设置
        int nextc = c + acquires;
        if (nextc < 0) {
            throw new Error("Maximum lock count exceeded");
        }

        this.setState(nextc); //state++ ; 
        return true;
    }

    return false;
}

//公平锁判断阻塞队列是否存在元素
 public final boolean hasQueuedPredecessors() {
        AbstractQueuedSynchronizer.Node h;
        if ((h = this.head) != null) { //队头表示当前运行线程
            AbstractQueuedSynchronizer.Node s;
            if ((s = h.next) == null || s.waitStatus > 0) {  //队头的下一个节点失效,对其致空
                s = null;
        //从队尾循环获取对头后面第一个waitStatus等待节点
                for(AbstractQueuedSynchronizer.Node p = this.tail; p != h && p != null; p = p.prev) {
                    if (p.waitStatus <= 0) {
                        s = p;
                    }
                }
            }
      //表示有正常线程在队列中等待数据,且不是重入的,这时tryAcquire()运行完毕=> 失败,运行addWaiter()添加节点到阻塞队列
            if (s != null && s.thread != Thread.currentThread()) {
                return true;
            }
        }

        return false;
    }
  • 通过以上源码分析可知:

    1. 当Lock.lock()时会调用AQS的acquire(1)根据是否公平/非公平调用不同的实现tryAcquire
      • 如果公平锁会首先判断AQS阻塞队列中是否有元素存在,如果有等待线程则直接加入队尾
    2. 如果当前state状态为0,代表锁可用,通过CAS替换为1,成功线程获取锁,设置AQS运行线程为当前线程
    3. 若状态不为0,查看是否线程一致,若一致表示可重入锁,只需增加state++即可
    4. 1~3表示lock加锁成功,正常进入运行,否则尝试锁失败,运行acquireQueued()入队操作
    方法调用图
    方法调用图
    • 调用关系图: 有关于status状态修改,生成节点,入队,出队等操作均有AQS完成
入队
  • 以上加锁失败后在AQS中入队操作addWaiter()

    private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
            AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);

            AbstractQueuedSynchronizer.Node oldTail;
            do {
                while(true) { //首次进入head,tail = null
                    oldTail = this.tail;
                    if (oldTail != null) {
                        node.setPrevRelaxed(oldTail);
                        break;
                    }
            //jdk9之前是enq(node),换汤不换药啦
                    this.initializeSyncQueue();
                }
            } while(!this.compareAndSetTail(oldTail, node));

            oldTail.next = node;
            return node;
        }
        
     //生成两个节点,head节点位Node() ,head.next 才是真正的第一个入队节点数据
     private final void initializeSyncQueue() {
        AbstractQueuedSynchronizer.Node h;
           if (HEAD.compareAndSet(this, (Void)null, h = new AbstractQueuedSynchronizer.Node())) {
              this.tail = h;
          }
    }

    1. 首次进入head,tail进入initializeSyncQueue() 初始化head为一个Node() ==> thread = null
    2. while循环第2次 将node节点添加到tail后面即head.next = node 并CAS交换tail指向node

    阻塞

    • 在AQS的acquireQueued()中实现
    final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
            boolean interrupted = false;

            try {
                while(true) {
                    AbstractQueuedSynchronizer.Node p = node.predecessor(); //当前节点的前驱节点
                    if (p == this.head && this.tryAcquire(arg)) { //如果前驱节点为head,则尝试加锁
                        this.setHead(node); //加锁成功,设置head为当前节点,同时设置node.thread = null, node.pre =null
                        p.next = null; //方便GC回收
                        return interrupted;
                    }
            //获取锁失败
                    if (shouldParkAfterFailedAcquire(p, node)) {
                        interrupted |= this.parkAndCheckInterrupt();
                    }
                }
            } catch (Throwable var5) {
                this.cancelAcquire(node);
                if (interrupted) {
                    selfInterrupt();
                }

                throw var5;
            }
        }
        
        //移动head到当前Node
       private void setHead(AbstractQueuedSynchronizer.Node node) {
            this.head = node;
            node.thread = null;
            node.prev = null;
        }
        
     // 该方法主要靠前驱节点判断当前线程是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
            int ws = pred.waitStatus; //前驱节点状态
            if (ws == -1) { //SIGNAL
                return true;
            } else {
                if (ws > 0) { //循环判断当前节点的前驱节点,移除cancel节点
                    do {
                      /**
                      * 循环查找取消节点的前任节点,
                        * 直到找到不是取消状态的节点,然后剔除是取消状态的节点,
                        * 关联前任节点的下一个节点为当前节点
                      */
                        node.prev = pred = pred.prev;
                    } while(pred.waitStatus > 0);

                    pred.next = node; //将当前节点添加到前驱的后置节点
                } else {
                  /*
                    * CAS设置前任节点等待状态为SIGNAL,
                    * 设置成功表示当前节点应该被阻塞,下一次循环调用就会
                    *  return  true
                   */
                    pred.compareAndSetWaitStatus(ws, -1); 
                }

                return false;
            }
        }
        
     /**
      * 把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。
     * 其内部则是调用LockSupport工具类的park()方法来阻塞该方法
     */
     private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
  1. 判断当前Node的前驱节点是否为head,若是则尝试获取锁,成功则将head 指向Node,并断开于前节点联系,待GC回收前节点
  2. 如果失败,则判断前置节点是否为SIGNAL状态,如果是,可以放心的park了,因为会被叫醒
  3. 否则,循环查找前置节点是否为CANCEL状态,如果是,则移除,直到找到一个为SIGNAL或者默认0,CAS替换为 -1,待下次while循环返回true进入阻塞
  4. 阻塞函数调用LockSupport.park(this)线程被挂起了 , Linux下的park查询是否有许可,unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”,这个“许可”是不能叠加的,“许可”是一次性的。
解锁 And 唤醒
  • 当调用lock.unlock解锁后,如果status为0,则会调用unpark唤醒head后面第一个可用节点:

    public final boolean release(int arg) {
            if (this.tryRelease(arg)) {
                AbstractQueuedSynchronizer.Node h = this.head;
                if (h != null && h.waitStatus != 0) { //全部解锁成功进入,head不为null,且waitStatus 不为默认值
                    this.unparkSuccessor(h);
                }

                return true;
            } else {
                return false;
            }
        }
        
     @ReservedStackAccess
     protected final boolean tryRelease(int releases) {
          int c = this.getState() - releases;
          if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
               throw new IllegalMonitorStateException();
          } else {
               boolean free = false;
               if (c == 0) { //可重入锁全部解锁成功
                  free = true;
                  this.setExclusiveOwnerThread((Thread)null); //设置AQS中Thread = null
               }

               this.setState(c);
               return free;
            }
      }
      
    //
    private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
            int ws = node.waitStatus;
            if (ws < 0) {  //ws状态可用,恢复默认0
                node.compareAndSetWaitStatus(ws, 0);
            }

            AbstractQueuedSynchronizer.Node s = node.next; //获取head下一个节点
            if (s == null || s.waitStatus > 0) { //从tail尾部遍历获取head下一个可用的节点s
                s = null;

                for(AbstractQueuedSynchronizer.Node p = this.tail; p != node && p != null; p = p.prev) {
                    if (p.waitStatus <= 0) {
                        s = p;
                    }
                }
            }

            if (s != null) { //存在为signal状态的waitStatus唤醒它
                LockSupport.unpark(s.thread);
            }

    }
  • 注意,此时并没有出队操作哦!那出队是什么时候呢?

    • 就是被唤醒的线程获取锁的操作中出队的;
出队
  • 在acquireQueued的while循环中,即setHead移动head操作,回收原有的oldHead即出队

ReentrantReadWriteLock

  • 读写锁:写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。因此,只有等待其他线程都释放了读锁,写锁才能被当前线程获取,而一旦写锁被获取,其他读写线程的后续访问都会被阻塞。

  • 注意读写锁中的几个常量值及位运算:

    • 分析可知:读写锁同样适用AQS的int型status标记,高16位表示读锁次数,低16位表示写锁次数,那么为何使用一个值而不是分开使用两个呢?
    static final int SHARED_SHIFT = 16; //移位个数,读锁或者写锁通过移位确定
    static final int SHARED_UNIT = 65536;  //0X10000
    static final int MAX_COUNT = 65535; //0Xffff
    static final int EXCLUSIVE_MASK = 65535; //0Xffff

    //获取分享个数:即读锁次数
    static int sharedCount(int c) {
        return c >>> 16;
    }

    //独占锁获取锁次数: 用于读锁操作 c为AQS中的status
    static int exclusiveCount(int c) {
          return c & '\uffff'
    }
读写锁加锁
  • 以下为读写锁加锁示例图,注意配合源码查看分析,这里篇幅原因就不贴源码了!

读写锁解锁
  • 读锁,写锁的解锁操作示意图如下

  • 知道你可能对流程图一知半解,举个稍微复杂的例子来说明思路:对于整个过程图,当写锁获取后的排队情况为写,读,读,写,读流程图如下;

问题:

  • park函数为何不会导致死锁呢?
//park调用链 LockSupport.park() ->unsafe.park() -> posixThread.park()
void Parker::park(bool isAbsolute, jlong time) {
  if (_counter > 0) {
       //已经有许可了,用掉当前许可
      _counter = 0 ;
     //使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到0
      OrderAccess::fence();
     //立即返回
      return ;
  }

//这里表示线程互斥变量锁成功了
  int status ;
  if (_counter > 0)  {
    // 有许可了,返回
    _counter = 0;
    //对互斥变量解锁
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }
  ...省略代码...
  
  //没有许可,调用wait等待
  if (time == 0) {
    //把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。
  //成功返回0,否则返回错误编号
    status = pthread_cond_wait (_cond, _mutex) ;
  } else {
  //同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUT
    status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;
    }
  }

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
 //等待结束后,许可被消耗,改为0  _counter = 0 ;
//释放互斥量的锁
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // If externally suspended while waiting, re-suspend 
    if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
//加入内存屏障指令
  OrderAccess::fence();
}

void Parker::unpark() {
  int s, status ;
 //给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ; 
  //存储旧的_counter
  s = _counter; 
//许可改为1,每次调用都设置成发放许可
  _counter = 1;
  if (s < 1) {
     //之前没有许可
      //默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
      status = pthread_cond_signal (_cond) ;
      //释放锁
      status = pthread_mutex_unlock(_mutex);
  
  } else {
   //一直有许可,释放掉自己加的锁,有许可park本身就返回了
    pthread_mutex_unlock(_mutex);
  }
}
  • LockSupport类为线程阻塞唤醒提供了基础,同时,在竞争条件问题上具有wait和notify无可比拟的优势。使用wait和notify组合时,某一线程在被另一线程notify之前必须要保证此线程已经执行到wait等待点,错过notify则可能永远都在等待,另外notify也不能保证唤醒指定的某线程。反观LockSupport,由于park与unpark引入了许可机制,许可逻辑为:
  • park将许可在等于0的时候阻塞,等于1的时候返回并将许可减为0。
  • unpark尝试唤醒线程,许可设置成 1。
  • 根据这两个逻辑,对于同一条线程,park与unpark先后操作的顺序并不影响程序正确地执行。假如先执行unpark操作,许可则为1,之后再执行park操作,此时因为许可等于1直接返回往下执行,并不执行阻塞操作。 最后,LockSupport的park与unpark组合真正解耦了线程之间的同步,不再需要另外的对象变量存储状态,并且也不需要考虑同步锁,wait与notify要保证必须有锁才能执行,而且执行notify操作释放锁后还要将当前线程扔进该对象锁的等待队列,LockSupport则完全不用考虑对象、锁、等待队列等问题。

air丶

2021/07/08  阅读:109  主题:默认主题

作者介绍

air丶