📓 Archive

AQS

FGJ: Create:2022/10/14 Update: [2024-11-21]

AQS 内部结构 #

  • 内部类Node节点代码

    代码示例
    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            /**
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile Node prev;
    
            /**
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
    }
    
  • 整体结构图

ReentrantLock #

  • 创建ReentrantLock示例

    
    // 新建 ReentrantLock
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    // 调用 lock()方法,内部实质是 调用 非公平锁的lock方法。
    public void lock() {
        sync.lock();
    }
    
  • 非公平锁与公平锁 获取锁图例

  • 代码片段

    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    /**
     * Convenience method to interrupt current thread.
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
  • 非公平锁与公平锁 释放锁图例

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    

Reference #


comments powered by Disqus