JUC之锁机制原理

JUC之锁机制原理

LockSupport原理

用于支撑JUC包用于操作线程阻塞和唤醒的工具类。可以看到底层都是通过Unsafe类来支撑。

public class LockSupport {    private LockSupport() {}        // 设置造成线程阻塞的对象信息,用于:debug、jstack    private static void setBlocker(Thread t, Object arg) {        UNSAFE.putObject(t, parkBlockerOffset, arg); // 去除CAS的volatile优化   }        // 唤醒当前阻塞的线程对象    public static void unpark(Thread thread) {        if (thread != null)            UNSAFE.unpark(thread);   }        // 带有提示对象blocker的阻塞线程操作    public static void park(Object blocker) {        Thread t = Thread.currentThread();        setBlocker(t, blocker);        UNSAFE.park(false, 0L);        setBlocker(t, null);   } ​    // 带超时时间(绝对时间)和提示对象的阻塞线程操作    public static void parkNanos(Object blocker, long nanos) {        if (nanos > 0) {            Thread t = Thread.currentThread();            setBlocker(t, blocker);            UNSAFE.park(false, nanos);            setBlocker(t, null);       }   }        // 带超时时间(相对时间)和提示对象的阻塞线程操作    public static void parkUntil(Object blocker, long deadline) {        Thread t = Thread.currentThread();        setBlocker(t, blocker);        UNSAFE.park(true, deadline);        setBlocker(t, null);   } ​    // 获取阻塞对象    public static Object getBlocker(Thread t) {        if (t == null)            throw new NullPointerException();        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);   } ​        public static void park() {        UNSAFE.park(false, 0L);   } ​    public static void parkUntil(long deadline) {        UNSAFE.park(true, deadline);   } ​ ​    static final int nextSecondarySeed() {        int r;        Thread t = Thread.currentThread();        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {            r ^= r << 13;   // xorshift            r ^= r >>> 17;            r ^= r << 5;       }        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)            r = 1; // avoid zero        UNSAFE.putInt(t, SECONDARY, r);        return r;   } ​    // 获取unsafe需要的地址值    private static final sun.misc.Unsafe UNSAFE;    private static final long parkBlockerOffset;    private static final long SEED;    private static final long PROBE;    private static final long SECONDARY;    static {        try {            UNSAFE = sun.misc.Unsafe.getUnsafe();            Class<?> tk = Thread.class;            parkBlockerOffset = UNSAFE.objectFieldOffset               (tk.getDeclaredField("parkBlocker"));            SEED = UNSAFE.objectFieldOffset               (tk.getDeclaredField("threadLocalRandomSeed"));            PROBE = UNSAFE.objectFieldOffset               (tk.getDeclaredField("threadLocalRandomProbe"));            SECONDARY = UNSAFE.objectFieldOffset               (tk.getDeclaredField("threadLocalRandomSecondarySeed"));       } catch (Exception ex) { throw new Error(ex); }   } ​ }

AQS原理

  1. Abstract : 因为它并不知道怎么上锁。模板方法设计模式即可,暴露出上锁逻辑
  2. Queue:线程阻塞队列
  3. Synchronizer:同步
  4. CAS+state 完成多线程抢锁逻辑
  5. Queue 完成抢不到锁的线程排队

acquire方法

获取写锁(互斥锁)的代码。tryAcquire方法由子类来完成,该方法也称之为模板方法,为何如此设计?这时因为AQS无法知道子类如何定义获取锁的操作。假如子类判断当前线程没有获取到锁,那么如何?排队去。addWaiter(Node.EXCLUSIVE)方法用于排队站位,acquireQueued方法用于站位后去沙发等待,此时不难猜出这里面肯定调用了tryAcquire(arg),可以想想为什么?因为在任何过程中,都有可能别的线程已经释放了锁。

public final void acquire(int arg) {    if (!tryAcquire(arg) && // 子类判定获取锁失败返回false,那么这里取反,表示为 true        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter方法为获取失败后添加到阻塞队列。进入阻塞队列后?考虑是否睡眠?        selfInterrupt(); } ​ // 子类实现获取锁的逻辑,AQS并不知道你怎么用这个state来上锁 protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException(); }

addWaiter方法

将阻塞线程节点放入阻塞队列。采用 全路径 + 优化前置 的技巧,实现快速入队。

private Node addWaiter(Node mode) {    // 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁)    Node node = new Node(Thread.currentThread(), mode);    // 快速加入到队列    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            // (面试重点,在后面进行操作时,最难理解,还有特重点在这里)特别注意:当上面的CAS成功后,有一瞬间 这里的pred.next并没有关联。会导致什么问题?有一瞬间,你通过head引用遍历的时候,是到达不了最后一个节点的,A(head)   ->     B(旧tail)   <-   C(tail)。如何获取最新的节点呢?通过tail指针往前遍历即可            pred.next = node;            return node;       }   }    enq(node);    return node; } ​ // 全路径入队方法 private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) { // 懒加载时,head和tail 分别代表aqs的头尾节点            // 通过CAS实现原子初始化操作,直接用一个空节点实现初始化,此时head和tail指向同一个空节点            if (compareAndSetHead(new Node()))                tail = head;       } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;           }       }   } }

acquireQueued方法

当加入阻塞队列后,调用该方法考虑是否将当前线程进行阻塞。在看该方法时,请考虑一个情况:假如在添加到阻塞队列后,当前锁状态是无锁时, 怎么办?那么一定时尝试获取锁。

// node节点为阻塞队列中的线程节点 final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            // 当前节点的前驱节点            final Node p = node.predecessor();            // 假如p是头结点,有可能此时头结点释放了锁,那么尝试调用tryAcquire,让子类抢一下锁            if (p == head && tryAcquire(arg)) {                // 获取成功,更新头结点,释放引用                setHead(node);                p.next = null;                failed = false;                return interrupted;           }            // 如果前驱节点不是头结点或者抢锁失败,何如?那就先判断是否应该被阻塞,如果阻塞呢?调用parkAndCheckInterrupt方法来阻塞当前线程。            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;       }   } finally {        if (failed)            cancelAcquire(node);   } } ​ // 阻塞当前线程。响应中断的方式阻塞线程 private final boolean parkAndCheckInterrupt() {    LockSupport.park(this); // this是啥意思?用于指明当前线程阻塞在哪个对象上,后期可以通过jstack命令来看到,用于排除问题    return Thread.interrupted(); // 判断是否是通过中断的方式来唤醒的。1、unpark 2、interrupt }

shouldParkAfterFailedAcquire方法

该方法用于判断当前线程节点是否应该阻塞。无非就是找到一个可靠(活着有效)的节点,然后将当前线程节点作为其后继节点即可。

// pred是前驱节点,node是当前线程节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        // 如果前驱节点是SIGNAL,那么此时可以安全的睡眠(因为SIGNAL状态,代表了上一个线程是活的,它可以通知你,所以当前线程节点可以安全的阻塞了)        return true;    if (ws > 0) {        // 前一个节点是CANCEL无效节点,那咋整?一直往前找,直到找到(有效节点)        do {            // 前驱节点的前驱节点,也即:踩着死亡节点往前跳            Node predPrev = pred.prev;            // 将前驱的前驱当成当前线程节点的前驱节点            pred = predPrev;            // 更新引用            node.prev = pred;       } while (pred.waitStatus > 0);        // 更新找到的前驱有效节点的next引用,指向当前节点        pred.next = node;   } else {        // 正常节点,那么CAS 将其变为SIGNAL        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);   }    return false; }

Node内部类

该内部类用于作用AQS的阻塞队列,封装线程节点。

static final class Node {    // 标志当前线程节点阻塞状态为:共享节点    static final Node SHARED = new Node();    // 标志当前线程节点的阻塞状态为:互斥节点    static final Node EXCLUSIVE = null; ​    // 唯一的大于0的节点,表示当前节点已经被取消,属于无效节点    static final int CANCELLED =  1;    // 表明当前节点是活动节点,可以唤醒后继的等待节点    static final int SIGNAL    = -1;    /** waitStatus value to indicate thread is waiting on condition */    static final int CONDITION = -2;    // 用于共享锁唤醒后继节点标志位用    static final int PROPAGATE = -3; ​    // 代表了当前节点的状态值,简称ws,取值为以上4个    volatile int waitStatus; ​    /**     * Link to predecessor node that current node/thread relies on     * for checking waitStatus. Assigned during enqueuing, and nulled     * out (for sake of GC) only upon dequeuing. Also, upon     * cancellation of a predecessor, we short-circuit while     * finding a non-cancelled one, which will always exist     * because the head node is never cancelled: A node becomes     * head only as a result of successful acquire. A     * cancelled thread never succeeds in acquiring, and a thread only     * cancels itself, not any other node.     */    volatile Node prev; ​    /**     * Link to the successor node that the current node/thread     * unparks upon release. Assigned during enqueuing, adjusted     * when bypassing cancelled predecessors, and nulled out (for     * sake of GC) when dequeued. The enq operation does not     * assign next field of a predecessor until after attachment,     * so seeing a null next field does not necessarily mean that     * node is at end of queue. However, if a next field appears     * to be null, we can scan prev's from the tail to     * double-check. The next field of cancelled nodes is set to     * point to the node itself instead of null, to make life     * easier for isOnSyncQueue.     */    volatile Node next; ​    /**     * The thread that enqueued this node. Initialized on     * construction and nulled out after use.     */    volatile Thread thread; ​    /**     * Link to next node waiting on condition, or the special     * value SHARED. Because condition queues are accessed only     * when holding in exclusive mode, we just need a simple     * linked queue to hold nodes while they are waiting on     * conditions. They are then transferred to the queue to     * re-acquire. And because conditions can only be exclusive,     * we save a field by using special value to indicate shared     * mode.     */    Node nextWaiter; ​    /**     * Returns true if node is waiting in shared mode.     */    final boolean isShared() {        return nextWaiter == SHARED;   } ​    /**     * Returns previous node, or throws NullPointerException if null.     * Use when predecessor cannot be null. The null check could     * be elided, but is present to help the VM.     *     * @return the predecessor of this node     */    final Node predecessor() throws NullPointerException {        Node p = prev;        if (p == null)            throw new NullPointerException();        else            return p;   } ​    Node() {    // Used to establish initial head or SHARED marker   } ​    Node(Thread thread, Node mode) {     // Used by addWaiter        this.nextWaiter = mode;        this.thread = thread;   } ​    Node(Thread thread, int waitStatus) { // Used by Condition        this.waitStatus = waitStatus;        this.thread = thread;   } }

release方法

释放写锁(互斥锁)的代码。

public final boolean release(int arg) {    // 子类判定释放锁成功    if (tryRelease(arg)) {        // 检查阻塞队列唤醒即可        Node h = head;        if (h != null && // AQS队列从来没被使用过            h.waitStatus != 0) // 那就是SIGNAL=-1            // 头结点为有效节点,且标注于需要唤醒后继节点,那么唤醒即可            unparkSuccessor(h);        return true;   }    return false; } ​ // 子类实现获取锁的逻辑,AQS并不知道你怎么用这个state来上锁 protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException(); }

unparkSuccessor方法

该方法用于释放node节点后的有效后继节点。说白了,就是从node节点往后找到有效地节点,唤醒即可。

private void unparkSuccessor(Node node) {    // 开始唤醒后继节点,当前头节点,且ws=SINGAL,CAS将其变为0,代表了我当前已经响应了这一次的唤醒操作    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 取当前头结点的后继节点,作为唤醒节点,但是,请注意条件    Node s = node.next;    if (s == null ||  // 为什么这里有可能为空?因为我们首先更新的是tail引用,然后再是旧的tail.next所以有可能一瞬间为空        s.waitStatus > 0) { // 后继节点居然是无效节点???        s = null;        // tail引用一定是最新的引用,那么从后往前找到 第一个(node节点的后继的第一个) 有效节点        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;   }    // 找到了s节点,此时s就是要唤醒的节点    if (s != null)        LockSupport.unpark(s.thread); }

acquireShared方法

该方法用于获取共享锁。

public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0) // 由子类判断是否获取锁成功,若不成功?该阻塞阻塞去        doAcquireShared(arg); } ​ protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException(); } ​ private void doAcquireShared(int arg) {    final Node node = addWaiter(Node.SHARED); // 和互斥锁一样,只不过呢?这里添加的是共享模式的锁节点    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            // 刚好这个节点就在头结点后面,可能头结点很快就释放锁了,那么尝试获取锁            if (p == head) {                int r = tryAcquireShared(arg);                // 如果获取锁成功,尝试唤醒后面的共享节点(因为共享锁是可以多线程同时获取,参考下:读写锁实现的读锁)                // A(head 写)->B(读)->C(读)                if (r >= 0) {                    setHeadAndPropagate(node, r); // 将当前获取锁的共享节点更新头部,然后唤醒后继节点                    p.next = null; // help GC                    if (interrupted) // 如果在等待过程中发生了中断,由于中断而被唤醒,那么置位当前线程的中断标志位                        selfInterrupt();                    failed = false;                    return;               }           }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;       }   } finally {        if (failed)            cancelAcquire(node);   } } ​ // 用于更新头结点,并且唤醒后继共享节点(面试重点,难点) private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below    setHead(node);    if (propagate > 0 || // 信号量还有多余的,那么直接唤醒。eg:A:2 head->B-C        h == null || // 不可能发生        h.waitStatus < 0 || // SIGNAL 表明必须唤醒后继节点 那么直接唤醒       (h = head) == null || // 不可能发生。A B (获取锁) head -> D        h.waitStatus < 0) { // SIGNAL 表明必须唤醒后继节点 那么直接唤醒        Node s = node.next;        if (s == null || s.isShared())            doReleaseShared();   } } ​ // 释放共享锁节点。(难点,核心点,面试点)。eg:当前状态:A B (获取锁) head -> C -> D private void doReleaseShared() {    for (;;) {        Node h = head; // 保存head临时变量(两个状态:head没有被更新、head被更新)        if (h != null && h != tail) { // 链表中还存在等待节点            int ws = h.waitStatus;            // 理应要唤醒后面的节点,因为SIGNAL的语义就是必须唤醒后面的节点(正常状态)            if (ws == Node.SIGNAL) {                // CAS将该状态由SIGNAL改为0,表示唤醒成功,那么如果失败呢?此时表明多个线程同时释放了共享锁,都需要同时唤醒后继节点,此时state 状态为 2 表明了资源数可用为2,需要唤醒后面两个等待的共享节点                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                // 直接唤醒后继节点即可                unparkSuccessor(h);           }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3 若CAS成功,表明A释放了一个共享锁时唤醒了C,此时在C还没有更新头节点之前,A马上又释放了一个共享锁,且CAS成功将头结点的状态改为了PROPAGATE,此时完美保证了第一个状态唤醒的正确性                continue;       }        if (h == head) // 头结点至今没被改变过,且 h != null && h != tail 成功,那么直接退出循环,没有节点了            break;   } }

cancelAcquire方法

取消的这个在竞争队列上的节点有几种状态:

  1. 在队列尾部
  2. 在队列中间
  3. 在队列前面,头结点的后面

private void cancelAcquire(Node node) {    if (node == null)        return;    node.thread = null;    // 找到一个合适的前驱节点(未被cancel的节点)    Node pred = node.prev;    while (pred.waitStatus > 0) // 也即ws不大于 cancel的状态 1        node.prev = pred = pred.prev;    // 有效前驱节点的后继节点    Node predNext = pred.next;    // 标识节点是无效节点,此时其他线程将会看到该状态,同时越过该节点处理    node.waitStatus = Node.CANCELLED;    if (node == tail &&    //   当前删除的节点 可能(删除时可以并发插入)   是尾结点        compareAndSetTail(node, pred)) { // 若当前快照看到了是尾结点,那么CAS尝试删除(修改tail指针为pred)        compareAndSetNext(pred, predNext, null); // cas修改next指针   } else {        int ws;        if (pred != head && // 被取消的节点不在头结点的后面           ((ws = pred.waitStatus) == Node.SIGNAL || // pred需要唤醒后继节点             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&            pred.thread != null) { // 线程对象必须不为空,因为只有线程对象存在的情况下该节点才是最终有效节点            Node next = node.next;            // 当前被删除节点的后继节点必须时有效节点            if (next != null && next.waitStatus <= 0)                // A->B(CANCEL)->C   A->C                compareAndSetNext(pred, predNext, next); // 尝试修改pred前驱节点的next指向node的后继节点       } else {            // 唤醒后继节点即可            unparkSuccessor(node);       }        node.next = node; // help GC   } }

ConditionObject原理

核心变量定义与构造器原理

根据变量定义,公用AQS的Node节点。

public interface Condition {    void await() throws InterruptedException;    void awaitUninterruptibly();    boolean await(long time, TimeUnit unit) throws InterruptedException;    boolean awaitUntil(Date deadline) throws InterruptedException;    void signal();    void signalAll(); } ​ public class ConditionObject implements Condition {    /** 条件阻塞队列的头结点 */    private transient Node firstWaiter;    /** 条件阻塞队列的尾结点 */    private transient Node lastWaiter; }

await方法原理

public final void await() throws InterruptedException {    // 检测中断    if (Thread.interrupted())        throw new InterruptedException();    Node node = addConditionWaiter(); // 生成一个Node等待节点将其放入条件阻塞队列    int savedState = fullyRelease(node); // 调用release操作唤醒竞争队列的节点。注意:当前线程的state变量,也即控制可重入的变量需要保存,因为在后面唤醒后要恢复状态    int interruptMode = 0;    // 节点未放入到AQS的竞争队列之前一直阻塞    while (!isOnSyncQueue(node)) {        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 响应中断            break;   }    // 开始竞争锁    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode); }

addConditionWaiter方法原理

private Node addConditionWaiter() {    Node t = lastWaiter;    // 尾结点被取消了,那么将其从节点中干掉    if (t != null && t.waitStatus != Node.CONDITION) {        unlinkCancelledWaiters();        t = lastWaiter;   }    // 创建等待节点,模式为CONDITION,表明等待在条件变量上    Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 标准链表操作    if (t == null)        firstWaiter = node;    else        t.nextWaiter = node;    lastWaiter = node;    return node; }

fullyRelease方法原理

final int fullyRelease(Node node) {    boolean failed = true;    try {        int savedState = getState(); // 先获取当前状态        if (release(savedState)) { // 释放所有状态,并且唤醒后继节点            failed = false;            return savedState;       } else {            throw new IllegalMonitorStateException();       }   } finally {        // 如果失败,那么将节点状态变为CANCELLED即可        if (failed)            node.waitStatus = Node.CANCELLED;   } }

isOnSyncQueue方法原理

final boolean isOnSyncQueue(Node node) {    // waitStatus等于CONDITION一定在条件阻塞队列上,prev为null,一定不在竞争队列上?因为竞争队列上的节点prev一定不为空,且prev节点的状态可能为SIGNAL。有同学说:头结点的prev为null,那么我问你:头结点在等待队列上吗?头结点代表了什么?代表了假节点,仅仅只是为了保证获取到后继的节点而设立的,如果硬要说有意义,那么一定是代表了当前获取到了锁的节点    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    // 若节点的next节点不为空,那么一定在AQS 竞争队列上    if (node.next != null)        return true;    return findNodeFromTail(node); } ​ private boolean findNodeFromTail(Node node) {    Node t = tail;    // 从尾指针开始遍历往前遍历直到找到该节点为止    for (;;) {        if (t == node)            return true;        if (t == null)            return false;        t = t.prev;   } }

checkInterruptWhileWaiting原理

其实特殊处理的就是:线程被中断返回,而不是正常unpark返回。如果正常返回该方法返回0。

private int checkInterruptWhileWaiting(Node node) {    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } ​ final boolean transferAfterCancelledWait(Node node) {    // CAS将节点状态修改为0,初始状态    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {        enq(node); // 添加到竞争队列        return true;   }    // 等待该节点进入竞争队列    while (!isOnSyncQueue(node))        Thread.yield();    return false; }

unlinkCancelledWaiters

private void unlinkCancelledWaiters() {    Node t = firstWaiter;    Node trail = null;    while (t != null) {        Node next = t.nextWaiter;        if (t.waitStatus != Node.CONDITION) {            t.nextWaiter = null;            if (trail == null)                firstWaiter = next;            else                trail.nextWaiter = next;            if (next == null)                lastWaiter = trail;       }        else            trail = t;        t = next;   } }

signal方法原理

唤醒线程将等待节点从等待节点上移动到竞争队列,尽量给它找一个很好地归宿,如果没有找到,那么唤醒它自己找吧。

public final void signal() {    // 当前线程一定要持有互斥锁    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    Node first = firstWaiter;    if (first != null)        doSignal(first); } ​ private void doSignal(Node first) {    do {        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        first.nextWaiter = null;   } while (!transferForSignal(first) &&             (first = firstWaiter) != null); } ​ final boolean transferForSignal(Node node) {    // CAS避免中断唤醒竞争    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;    Node p = enq(node);    // 当添加到竞争队列之后,节点的状态有可能会改变,但是切记这里返回的前驱节点    int ws = p.waitStatus;    if (ws > 0 ||  // 前驱节点已经被取消,为无效节点(此操作为优化操作)        !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread); // 唤醒等待条件变量的线程,让其调用acquireQueued方法将自己调整位置并再次阻塞或者获取锁    return true; }

总结

子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS来完成。