博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AbstractQueuedSynchronizer源码解析
阅读量:3576 次
发布时间:2019-05-20

本文共 15552 字,大约阅读时间需要 51 分钟。

  • AQS是什么呢??

我认为AQS是一个同步的辅助工具,当出现对公共资源的竞争时,AQS维持了一个双向队列,用于存储获取资源的线程对象。

AQS封装了许多基础的方法,如等待超时,线程中断处理等。下面的一张图是对AQS主要功能的一个较好诠释。

state代表的是公共资源,位于head的线程表示获得资源所有权,队列后面的线程处于阻塞状态。

AQS 支持两种模式下来的资源获取,独占模式及共享模式,它是一个抽象类,只要我们继承它并实现一些基础方法如

tryAcquire(int args)等try开头的模板方法,就能实现一个具有等待功能的资源同步器。我们熟悉的相关Lock的实现都是依靠于AQS来实现的。

AQS 的主要成员及方法。

先来看看AQS对于节点的封装Node

static final class Node {        /** 标识该节点是否处于共享状态的等待状态 */        static final Node SHARED = new Node();        /** 标识该节点是否处于独占模式的等待状态 */        static final Node EXCLUSIVE = null;        /** 取消状态 几个状态中唯一一个大于1的 */        static final int CANCELLED =  1;        /** 等待被唤醒 */        static final int SIGNAL    = -1;        /** 处于条件等待状态中 */        static final int CONDITION = -2;        /**         * 用于共享模式下的传播,至今不太明白有什么作用         *          */        static final int PROPAGATE = -3;        /**         * 节点状态。必须用cas来修改期状态         *         */        volatile int waitStatus;        /**          * 前一个节点         */        volatile Node prev;        /**          * 下一个节点         */        volatile Node next;        /**         * 当前节点的持有线程         */        volatile Thread thread;        /**          *          */        Node nextWaiter;        /**         * 判断是否共享模式         */        final boolean isShared() {            return nextWaiter == SHARED;        }        /**         * 获取前一个节点         */        final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;        }        Node() {            }        Node(Thread thread, Node mode) {                this.nextWaiter = mode;            this.thread = thread;        }        Node(Thread thread, int waitStatus) {             this.waitStatus = waitStatus;            this.thread = thread;        }    }

接下来看看一个接口,该接口类似于Object的wait(),notify(),notifyAll()主要用于条件等待。

public interface Condition {    /**      * 同wait()方法,释放锁,阻塞。     *     */    void await() throws InterruptedException;    /**     * 释放锁直到线程中断     *     */    void awaitUninterruptibly();    /**     *     *  阻塞指定时间后醒来,然后重新参与锁的竞争(重新进入等待队列,有可能还会重新阻塞)     */    long awaitNanos(long nanosTimeout) throws InterruptedException;    /**     * 跟上一个方法没什么区别     */    boolean await(long time, TimeUnit unit) throws InterruptedException;    /**     *    同一类     */    boolean awaitUntil(Date deadline) throws InterruptedException;    /**     * 唤醒一个等待线程     */    void signal();    /**     * 唤醒所有等待的线程,然后回重新参与竞争     */    void signalAll();}

这里一点挺值得关注的,wait() 或者 await() 指定时间的时候,到了指定时间之后,其实并不会立即中断,只是阻塞停止而已,

然后又继续参与锁的竞争。然后参与竞争的过程中有可能是进入了等待队列,又会重新阻塞。

AQS中有一个内部类,ConditionObject,他实现了Condition 接口。

public class ConditionObject implements Condition, java.io.Serializable {        private static final long serialVersionUID = 1173984872572414699L;        private transient Node firstWaiter;        private transient Node lastWaiter;                public ConditionObject() { }        // Internal methods        /**         * 往条件等待队列中添加一个等待节点         */        private Node addConditionWaiter() {            Node t = lastWaiter;            // If lastWaiter is cancelled, clean out.            if (t != null && t.waitStatus != Node.CONDITION) {                unlinkCancelledWaiters();                t = lastWaiter;            }            Node node = new Node(Thread.currentThread(), Node.CONDITION);            if (t == null)                firstWaiter = node;            else                t.nextWaiter = node;            lastWaiter = node;            return node;        }        /**         * 唤醒首节点         */        private void doSignal(Node first) {            do {                if ( (firstWaiter = first.nextWaiter) == null)                    lastWaiter = null;                first.nextWaiter = null;            } while (!transferForSignal(first) &&                     (first = firstWaiter) != null);        }        /**         * 唤醒所有等待节点         *          */        private void doSignalAll(Node first) {            lastWaiter = firstWaiter = null;            do {                Node next = first.nextWaiter;                first.nextWaiter = null;                transferForSignal(first);                first = next;            } while (first != null);        }        /**          * 清除等待状态的节点         */        private void unlinkCancelledWaiters() {            Node t = firstWaiter;            Node trail = null;            while (t != null) {                Node next = t.nextWaiter;                if (t.waitStatus != Node.CONDITION) {                    t.nextWaiter = null;                    if (trail == null)                        firstWaiter = next;                    else                        trail.nextWaiter = next;                    if (next == null)                        lastWaiter = trail;                }                else                    trail = t;                t = next;            }        }        /**         * 实现Condition的方法         */        public final void signal() {            // 条件等待仅在独占模式下课用            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignal(first);        }        /**          * 实现Condition 的方法         */        public final void signalAll() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignalAll(first);        }        /**         * 实现Condition 的方法         */        public final void awaitUninterruptibly() {            // 添加等待节点            Node node = addConditionWaiter();            // 释放AQS 中的节点            long savedState = fullyRelease(node);            boolean interrupted = false;            while (!isOnSyncQueue(node)) {                // 阻塞                LockSupport.park(this);                // 响应中断                if (Thread.interrupted())                    interrupted = true;            }              // 被唤醒了 重新进去AQS 进行锁竞争 并对中断进行响应            if (acquireQueued(node, savedState) || interrupted)                selfInterrupt();        }        /*         *          */        private int checkInterruptWhileWaiting(Node node) {            return Thread.interrupted() ?                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :                0;        }        /**         * Throws InterruptedException, reinterrupts current thread, or         * does nothing, depending on mode.         */        private void reportInterruptAfterWait(int interruptMode)            throws InterruptedException {            if (interruptMode == THROW_IE)                throw new InterruptedException();            else if (interruptMode == REINTERRUPT)                selfInterrupt();        }        /**          *         */        public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            Node node = addConditionWaiter();            long savedState = fullyRelease(node);            int interruptMode = 0;            while (!isOnSyncQueue(node)) {                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }        /**           *         */        public final long awaitNanos(long nanosTimeout)                throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            Node node = addConditionWaiter();            long savedState = fullyRelease(node);            final long deadline = System.nanoTime() + nanosTimeout;            int interruptMode = 0;            while (!isOnSyncQueue(node)) {                if (nanosTimeout <= 0L) {                    transferAfterCancelledWait(node);                    break;                }                if (nanosTimeout >= spinForTimeoutThreshold)                    LockSupport.parkNanos(this, nanosTimeout);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;                nanosTimeout = deadline - System.nanoTime();            }            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null)                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);            return deadline - System.nanoTime();        }        /**          *         */        public final boolean awaitUntil(Date deadline)                throws InterruptedException {            long abstime = deadline.getTime();            if (Thread.interrupted())                throw new InterruptedException();            Node node = addConditionWaiter();            long savedState = fullyRelease(node);            boolean timedout = false;            int interruptMode = 0;            while (!isOnSyncQueue(node)) {                if (System.currentTimeMillis() > abstime) {                    timedout = transferAfterCancelledWait(node);                    break;                }                LockSupport.parkUntil(this, abstime);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null)                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);            return !timedout;        }        /**          * 等待指定时间         */        public final boolean await(long time, TimeUnit unit)                throws InterruptedException {            long nanosTimeout = unit.toNanos(time);            if (Thread.interrupted())                throw new InterruptedException();            Node node = addConditionWaiter();            long savedState = fullyRelease(node);            final long deadline = System.nanoTime() + nanosTimeout;            boolean timedout = false;            int interruptMode = 0;            //如果再同步队列你说明被唤醒了            while (!isOnSyncQueue(node)) {                if (nanosTimeout <= 0L) {                    timedout = transferAfterCancelledWait(node);                    break;                }                if (nanosTimeout >= spinForTimeoutThreshold)                    LockSupport.parkNanos(this, nanosTimeout);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;                nanosTimeout = deadline - System.nanoTime();            }            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null)                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);            return !timedout;        }        //  support for instrumentation        /**         * Returns true if this condition was created by the given         * synchronization object.         *         * @return {@code true} if owned         */        final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {            return sync == AbstractQueuedLongSynchronizer.this;        }        /**         *  判断是否有等待节点         */        protected final boolean hasWaiters() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {                if (w.waitStatus == Node.CONDITION)                    return true;            }            return false;        }        /**         * 获取等待队列的长度         */        protected final int getWaitQueueLength() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            int n = 0;            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {                if (w.waitStatus == Node.CONDITION)                    ++n;            }            return n;        }        /**         *  获取所有等待线程         */        protected final Collection
getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList
list = new ArrayList
(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }

接下来看看AQS一些关键的方法。

/**     *  获取独占锁      * tryAcquired(arg)为模板方法,由子类实现     * addWaiter() 为创建Node 节点 acquireQueued()阻塞获取锁     * selfInterrupt 为 中断处理     */    public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }
/**     *  创建节点 并加入队列尾部     *     *      */    private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        //  如果没并发,就直接加入尾部即可        Node pred = tail;        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        //上面未能直接加入尾部,下面会进行循环加入队列尾部直至成功        enq(node);        return node;    }
/**     * 循环CAS 操作,直至加入尾部成功     *      */    private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }
/**     * 真正滴去获取独占锁     *      *     * @param node the node     * @param arg the acquire argument     * @return {@code true} if interrupted while waiting     */    final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                // 如果当前节点的前节点是头节点且获取锁成功则成功                if (p == head && tryAcquire(arg)) {                    // 设置当前节点为头节点                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                // 判断是否需要阻塞 并且找到一个安全进行阻塞(有人唤醒自己就是安全的)                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                //失败获取了,就要去释放节点                cancelAcquire(node);        }    }
/**     * 就是等待了,并返回是否中断过     *     * @return {@code true} if interrupted     */    private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }

 

转载地址:http://dhagj.baihongyu.com/

你可能感兴趣的文章
vue-cli3.0设置环境变量
查看>>
vue父组件直接操作子组件的方法(不通过$emit和$on)
查看>>
vue上传文件到UCloud
查看>>
获取input选择文件的本地地址
查看>>
React绑定全局方法或变量
查看>>
js监听div标签上面的自定义属性
查看>>
navcat如何重置窗口
查看>>
代码注入
查看>>
off-by-one
查看>>
ctf-pwn的一些小技巧
查看>>
POJ 1915 Knight Moves
查看>>
Git 撤销修改
查看>>
Git 删除文件
查看>>
Git与远程仓库关联以及关联错误解决方法
查看>>
[HDU] 平方和与立方和
查看>>
[HDU 2096] 小明A+B
查看>>
[HDU 2520] 我是菜鸟,我怕谁(不一样的for循环)
查看>>
[HDU 1215] 七夕节(求因子,不超时)
查看>>
[POJ 1915] Knight Moves
查看>>
Memcache技术精华
查看>>