JUC之AQS
AbstractQueuedSynchronizer
先大致讲一下工作原理:AQS内部主要维护了一个Node类型的链表,用于储存排队的线程信息,当有新的需要阻塞的线程进来时一般创建一个node对象,加入到链表的尾部,当链表中首节点释放资源时调用LockSupport的unpark去唤醒等待的线程。
配上一个比较经典的图片
接下来是AQS主要的成员变量:
//链表的首节点private transient volatile Node head;//链表的尾节点private transient volatile Node tail;//当前同步器的状态private volatile int state;//一般用于申请资源时判断是否自旋的一个时间阈值,当请求设置的超时时间小于这个时间的时候,就直接自旋而不是等待static final long spinForTimeoutThreshold = 1000L;//Unsafe对象,java中用于直接操作内存地址的工具,授信于jdk代码private static final Unsafe unsafe = Unsafe.getUnsafe();//下面可以理解为上述成员变量在对应对象的内存地址内的偏移量//waitStatus与next是AQS的内部类Node中的成员变量private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;
接下来介绍一些核心方法
- acquire(int arg)
无视线程interrupt不断的申请资源
/*** acquire用于排他的申请资源,其中tryAcquire是一个抽象方法,需要子类去自己实现。* 当第一次tryAcquire失败后,会添加一个排他的node到node链表中,然后不断的申请资源。* /public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}/*** 添加当前线程到等待的链表中,其中通过unsafe中的方法修改自己为tail节点* 如果失败了则进入enq中循环执行把自己设置为tail的操作*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}/*** 该方法循环执行以下逻辑* 当前线程前一个线程是head时,去执行tryAcquire方法尝试获取资源,同上tryAcquire为子类实现* 执行shouldParkAfterFailedAcquire判断在请求资源失败之后是否需要park(即阻塞自己),如果需要阻塞则执行parkAndCheckInterrupt*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** 取链表中前驱节点的waitStatus* 如果是signal的,则需要阻塞自己,因为之前的节点在释放资源之后会唤醒自己* 如果状态>0的,即是cancelled,则往前找,一直找到不是cancelled状态的节点,然后把自己放在它后面* 如果状态是0或者-3,则需要把前驱的状态设置为signal的,此次自己不阻塞等下次自己再进入这个方法时,便会阻塞自己*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}/*** 执行LockSupport中的park方法,阻塞自己,等待其他线程释放资源时唤醒自己,执行下一行即return*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
- acquireInterruptibly(int arg)
/** * 与accquire方法不同的是,当线程interrupt的时候,该方法会抛出InterruptedException*/public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}/*** 与之前acquireQueued不同的是,这里当执行到parkAndCheckInterrupt返回true的时候,即线程状态是interrupted状态时,会抛出异常* 这里补充一下,线程一般销毁是在run方法执行完后(stop destry这些不推荐使用),而执行线程的interrupt方法时,* 线程不会结束,只会去修改线程的状态,以下是网上处理interrupt的一些建议* 1)线程处于阻塞状态,如使用了sleep,同步锁的wait,socket中的receiver,accept等方法时,会使线程处于阻塞状态。* 当调用线程的interrupt()方法时,会抛出InterruptException异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,* 然后break跳出循环状态,从而让我们有机会结束这个线程的执行。* 一定要先捕获InterruptedException异常之后通过break来跳出循环,才能正常结束run方法* 2)线程未处于阻塞状态,使用isInterrupted()判断线程的中断标志来退出循环。* 当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理
*/private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
- acquireShared(int arg)
/*** 同样的是tryAcquireShared是个抽象方法,需要子类去实现。* 返回负值就表示失败,然后进入自旋*/public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}/*** 这里先添加一个共享模式的node到链表尾部,然后自旋,大部分逻辑与acquireQueued是一样的* 判断是否需要阻塞*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** 当前线程取得资源 或者 首节点为空 或者 首节点不是被cancel 或者 此时head为空 或者 此时head没有被cancel* 这些条件满足一条时 判断下个节点是否是空或者共享节点,是的话就调用doReleaseShared*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
- acquireSharedInterruptibly(int arg)
与排它模式的interrupt一样,就是线程被设置为interrupt状态后会抛出异常
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
- release(int arg)
释放资源
/*** tryRelease为抽象方法,需要子类实现* 这里tryRelease成功后,如果head不为空且waitStatus不为0就唤醒后继的阻塞线程*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
然后是两个内部类,先介绍Node这个内部类
成员变量
/** 标志node为共享模式 */static final Node SHARED = new Node();/** 标志node为排他模式 */static final Node EXCLUSIVE = null;/** waitStatus的值,标志着线程被撤销了 */static final int CANCELLED = 1;/** waitStatus的值,标志着后续的线程需要被unpark,说简单点,就是处于这个状态的node,在head节点释放之后会被唤醒 */static final int SIGNAL = -1;/** waitStatus的值,标志着线程正在等待某种条件满足 */static final int CONDITION = -2;/*** waitStatus的值,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。*/static final int PROPAGATE = -3;/** waitStatus**/volatile int waitStatus;/** 前驱节点 **/volatile Node prev;/** 后继节点 **/volatile Node next;/**当前线程**/volatile Thread thread;/**下一个等待环节,指向排他类型的node,区分next在于next可能是共享模式的,方便快速定位到下一个等待线程**/Node nextWaiter;
另一个内部类ConditionObject,实现了Condition接口
成员变量
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;/** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE = -1;
JUC之AQS
AbstractQueuedSynchronizer
先大致讲一下工作原理:AQS内部主要维护了一个Node类型的链表,用于储存排队的线程信息,当有新的需要阻塞的线程进来时一般创建一个node对象,加入到链表的尾部,当链表中首节点释放资源时调用LockSupport的unpark去唤醒等待的线程。
配上一个比较经典的图片
接下来是AQS主要的成员变量:
//链表的首节点private transient volatile Node head;//链表的尾节点private transient volatile Node tail;//当前同步器的状态private volatile int state;//一般用于申请资源时判断是否自旋的一个时间阈值,当请求设置的超时时间小于这个时间的时候,就直接自旋而不是等待static final long spinForTimeoutThreshold = 1000L;//Unsafe对象,java中用于直接操作内存地址的工具,授信于jdk代码private static final Unsafe unsafe = Unsafe.getUnsafe();//下面可以理解为上述成员变量在对应对象的内存地址内的偏移量//waitStatus与next是AQS的内部类Node中的成员变量private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;
接下来介绍一些核心方法
- acquire(int arg)
无视线程interrupt不断的申请资源
/*** acquire用于排他的申请资源,其中tryAcquire是一个抽象方法,需要子类去自己实现。* 当第一次tryAcquire失败后,会添加一个排他的node到node链表中,然后不断的申请资源。* /public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}/*** 添加当前线程到等待的链表中,其中通过unsafe中的方法修改自己为tail节点* 如果失败了则进入enq中循环执行把自己设置为tail的操作*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}/*** 该方法循环执行以下逻辑* 当前线程前一个线程是head时,去执行tryAcquire方法尝试获取资源,同上tryAcquire为子类实现* 执行shouldParkAfterFailedAcquire判断在请求资源失败之后是否需要park(即阻塞自己),如果需要阻塞则执行parkAndCheckInterrupt*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** 取链表中前驱节点的waitStatus* 如果是signal的,则需要阻塞自己,因为之前的节点在释放资源之后会唤醒自己* 如果状态>0的,即是cancelled,则往前找,一直找到不是cancelled状态的节点,然后把自己放在它后面* 如果状态是0或者-3,则需要把前驱的状态设置为signal的,此次自己不阻塞等下次自己再进入这个方法时,便会阻塞自己*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}/*** 执行LockSupport中的park方法,阻塞自己,等待其他线程释放资源时唤醒自己,执行下一行即return*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
- acquireInterruptibly(int arg)
/** * 与accquire方法不同的是,当线程interrupt的时候,该方法会抛出InterruptedException*/public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}/*** 与之前acquireQueued不同的是,这里当执行到parkAndCheckInterrupt返回true的时候,即线程状态是interrupted状态时,会抛出异常* 这里补充一下,线程一般销毁是在run方法执行完后(stop destry这些不推荐使用),而执行线程的interrupt方法时,* 线程不会结束,只会去修改线程的状态,以下是网上处理interrupt的一些建议* 1)线程处于阻塞状态,如使用了sleep,同步锁的wait,socket中的receiver,accept等方法时,会使线程处于阻塞状态。* 当调用线程的interrupt()方法时,会抛出InterruptException异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,* 然后break跳出循环状态,从而让我们有机会结束这个线程的执行。* 一定要先捕获InterruptedException异常之后通过break来跳出循环,才能正常结束run方法* 2)线程未处于阻塞状态,使用isInterrupted()判断线程的中断标志来退出循环。* 当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理
*/private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
- acquireShared(int arg)
/*** 同样的是tryAcquireShared是个抽象方法,需要子类去实现。* 返回负值就表示失败,然后进入自旋*/public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}/*** 这里先添加一个共享模式的node到链表尾部,然后自旋,大部分逻辑与acquireQueued是一样的* 判断是否需要阻塞*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** 当前线程取得资源 或者 首节点为空 或者 首节点不是被cancel 或者 此时head为空 或者 此时head没有被cancel* 这些条件满足一条时 判断下个节点是否是空或者共享节点,是的话就调用doReleaseShared*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
- acquireSharedInterruptibly(int arg)
与排它模式的interrupt一样,就是线程被设置为interrupt状态后会抛出异常
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
- release(int arg)
释放资源
/*** tryRelease为抽象方法,需要子类实现* 这里tryRelease成功后,如果head不为空且waitStatus不为0就唤醒后继的阻塞线程*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
然后是两个内部类,先介绍Node这个内部类
成员变量
/** 标志node为共享模式 */static final Node SHARED = new Node();/** 标志node为排他模式 */static final Node EXCLUSIVE = null;/** waitStatus的值,标志着线程被撤销了 */static final int CANCELLED = 1;/** waitStatus的值,标志着后续的线程需要被unpark,说简单点,就是处于这个状态的node,在head节点释放之后会被唤醒 */static final int SIGNAL = -1;/** waitStatus的值,标志着线程正在等待某种条件满足 */static final int CONDITION = -2;/*** waitStatus的值,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。*/static final int PROPAGATE = -3;/** waitStatus**/volatile int waitStatus;/** 前驱节点 **/volatile Node prev;/** 后继节点 **/volatile Node next;/**当前线程**/volatile Thread thread;/**下一个等待环节,指向排他类型的node,区分next在于next可能是共享模式的,方便快速定位到下一个等待线程**/Node nextWaiter;
另一个内部类ConditionObject,实现了Condition接口
成员变量
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;/** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE = -1;
发布评论