本文共 15552 字,大约阅读时间需要 51 分钟。
我认为AQS是一个同步的辅助工具,当出现对公共资源的竞争时,AQS维持了一个双向队列,用于存储获取资源的线程对象。
AQS封装了许多基础的方法,如等待超时,线程中断处理等。下面的一张图是对AQS主要功能的一个较好诠释。
state代表的是公共资源,位于head的线程表示获得资源所有权,队列后面的线程处于阻塞状态。
AQS 支持两种模式下来的资源获取,独占模式及共享模式,它是一个抽象类,只要我们继承它并实现一些基础方法如
tryAcquire(int args)等try开头的模板方法,就能实现一个具有等待功能的资源同步器。我们熟悉的相关Lock的实现都是依靠于AQS来实现的。
AQS 的主要成员及方法。
先来看看AQS对于节点的封装Node
static final class Node { /** 标识该节点是否处于共享状态的等待状态 */ static final Node SHARED = new Node(); /** 标识该节点是否处于独占模式的等待状态 */ static final Node EXCLUSIVE = null; /** 取消状态 几个状态中唯一一个大于1的 */ static final int CANCELLED = 1; /** 等待被唤醒 */ static final int SIGNAL = -1; /** 处于条件等待状态中 */ static final int CONDITION = -2; /** * 用于共享模式下的传播,至今不太明白有什么作用 * */ static final int PROPAGATE = -3; /** * 节点状态。必须用cas来修改期状态 * */ volatile int waitStatus; /** * 前一个节点 */ volatile Node prev; /** * 下一个节点 */ volatile Node next; /** * 当前节点的持有线程 */ volatile Thread thread; /** * */ Node nextWaiter; /** * 判断是否共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 获取前一个节点 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
接下来看看一个接口,该接口类似于Object的wait(),notify(),notifyAll()主要用于条件等待。
public interface Condition { /** * 同wait()方法,释放锁,阻塞。 * */ void await() throws InterruptedException; /** * 释放锁直到线程中断 * */ void awaitUninterruptibly(); /** * * 阻塞指定时间后醒来,然后重新参与锁的竞争(重新进入等待队列,有可能还会重新阻塞) */ long awaitNanos(long nanosTimeout) throws InterruptedException; /** * 跟上一个方法没什么区别 */ boolean await(long time, TimeUnit unit) throws InterruptedException; /** * 同一类 */ boolean awaitUntil(Date deadline) throws InterruptedException; /** * 唤醒一个等待线程 */ void signal(); /** * 唤醒所有等待的线程,然后回重新参与竞争 */ void signalAll();}
这里一点挺值得关注的,wait() 或者 await() 指定时间的时候,到了指定时间之后,其实并不会立即中断,只是阻塞停止而已,
然后又继续参与锁的竞争。然后参与竞争的过程中有可能是进入了等待队列,又会重新阻塞。
AQS中有一个内部类,ConditionObject,他实现了Condition 接口。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { } // Internal methods /** * 往条件等待队列中添加一个等待节点 */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * 唤醒首节点 */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 唤醒所有等待节点 * */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } /** * 清除等待状态的节点 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } /** * 实现Condition的方法 */ public final void signal() { // 条件等待仅在独占模式下课用 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * 实现Condition 的方法 */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * 实现Condition 的方法 */ public final void awaitUninterruptibly() { // 添加等待节点 Node node = addConditionWaiter(); // 释放AQS 中的节点 long savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { // 阻塞 LockSupport.park(this); // 响应中断 if (Thread.interrupted()) interrupted = true; } // 被唤醒了 重新进去AQS 进行锁竞争 并对中断进行响应 if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } /** * */ public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * 等待指定时间 */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; //如果再同步队列你说明被唤醒了 while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { return sync == AbstractQueuedLongSynchronizer.this; } /** * 判断是否有等待节点 */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * 获取等待队列的长度 */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * 获取所有等待线程 */ protected final CollectiongetWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList list = new ArrayList (); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
接下来看看AQS一些关键的方法。
/** * 获取独占锁 * tryAcquired(arg)为模板方法,由子类实现 * addWaiter() 为创建Node 节点 acquireQueued()阻塞获取锁 * selfInterrupt 为 中断处理 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * 创建节点 并加入队列尾部 * * */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 如果没并发,就直接加入尾部即可 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上面未能直接加入尾部,下面会进行循环加入队列尾部直至成功 enq(node); return node; }
/** * 循环CAS 操作,直至加入尾部成功 * */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
/** * 真正滴去获取独占锁 * * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 如果当前节点的前节点是头节点且获取锁成功则成功 if (p == head && tryAcquire(arg)) { // 设置当前节点为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否需要阻塞 并且找到一个安全进行阻塞(有人唤醒自己就是安全的) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //失败获取了,就要去释放节点 cancelAcquire(node); } }
/** * 就是等待了,并返回是否中断过 * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
转载地址:http://dhagj.baihongyu.com/