Skip to content

Commit f69d399

Browse files
committed
Update aqs.md
1 parent e680941 commit f69d399

File tree

1 file changed

+178
-67
lines changed

1 file changed

+178
-67
lines changed

docs/java/concurrent/aqs.md

Lines changed: 178 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ AQS(`AbstractQueuedSynchronizer`)的核心原理图:
4040

4141
![CLH 队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
4242

43-
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/同步队列** 来完成获取资源线程的排队工作。
43+
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
4444

4545
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获锁情况。
4646

@@ -120,9 +120,9 @@ protected boolean isHeldExclusively()
120120

121121
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。
122122

123-
Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
123+
`Semaphore` 的使用简单,我们这里假设有 `N(N>5)` 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
124124

125-
```java
125+
```java
126126
// 初始共享资源数量
127127
final Semaphore semaphore = new Semaphore(5);
128128
// 获取1个许可
@@ -158,41 +158,46 @@ public Semaphore(int permits, boolean fair) {
158158

159159
`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
160160

161-
调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。
161+
以无参 `acquire` 方法为例,调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。
162162

163-
如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入阻塞队列,挂起当前线程。
163+
如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。
164164

165165
```java
166-
/**
167-
* 获取1个许可证
168-
*/
166+
// 获取1个许可证
169167
public void acquire() throws InterruptedException {
170168
sync.acquireSharedInterruptibly(1);
171169
}
172-
/**
173-
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
174-
*/
170+
171+
// 获取一个或者多个许可证
172+
public void acquire(int permits) throws InterruptedException {
173+
if (permits < 0) throw new IllegalArgumentException();
174+
sync.acquireSharedInterruptibly(permits);
175+
}
176+
```
177+
178+
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
179+
180+
```java
181+
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
175182
public final void acquireSharedInterruptibly(int arg)
176183
throws InterruptedException {
177184
if (Thread.interrupted())
178185
throw new InterruptedException();
179-
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入阻塞队列,挂起当前线程。
186+
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
180187
if (tryAcquireShared(arg) < 0)
181188
doAcquireSharedInterruptibly(arg);
182189
}
183-
/**
184-
* 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
185-
* 1、获取失败,返回负值
186-
* 2、共享模式下获取成功,但后续的共享模式获取会失败,返回0
187-
* 3、共享模式获取成功,随后的共享模式也可能获取成功,返回正值
188-
*/
190+
```
191+
192+
这里再以非公平模式(`NonfairSync`)的为例,看看 `tryAcquireShared` 方法的实现。
193+
194+
```java
195+
// 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
189196
protected int tryAcquireShared(int acquires) {
190197
return nonfairTryAcquireShared(acquires);
191198
}
192-
/**
193-
* 非公平的共享模式获取许可证,acquires为许可证数量,根据代码上下文可知该值总是为1
194-
* 注:公平模式的实现会先判断队列中是否有节点在排队,有则直接返回-1,表示获取失败,没有则执行下面的操作
195-
*/
199+
200+
// 非公平的共享模式获取许可证
196201
final int nonfairTryAcquireShared(int acquires) {
197202
for (;;) {
198203
// 当前可用许可证数量
@@ -209,47 +214,74 @@ final int nonfairTryAcquireShared(int acquires) {
209214
}
210215
```
211216

212-
调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入阻塞队列,挂起线程。
217+
以无参 `release` 方法为例,调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入等待队列,挂起线程。
213218

214219
```java
215220
// 释放一个许可证
216221
public void release() {
217222
sync.releaseShared(1);
218223
}
219224

220-
// 释放共享锁,同时会唤醒同步队列中的一个线程。
225+
// 释放一个或者多个许可证
226+
public void release(int permits) {
227+
if (permits < 0) throw new IllegalArgumentException();
228+
sync.releaseShared(permits);
229+
}
230+
```
231+
232+
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
233+
234+
```java
235+
// 释放共享锁
236+
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
221237
public final boolean releaseShared(int arg) {
222238
//释放共享锁
223239
if (tryReleaseShared(arg)) {
224-
//唤醒同步队列中的一个线程
240+
//释放当前节点的后置等待节点
225241
doReleaseShared();
226242
return true;
227243
}
228244
return false;
229245
}
246+
```
247+
248+
`tryReleaseShared` 方法是`Semaphore` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
249+
250+
```java
251+
// 内部类 Sync 中重写的一个方法
230252
// 尝试释放资源
231253
protected final boolean tryReleaseShared(int releases) {
232254
for (;;) {
233255
int current = getState();
234-
int next = current + releases; // 可用许可证+1
256+
// 可用许可证+1
257+
int next = current + releases;
235258
if (next < current) // overflow
236259
throw new Error("Maximum permit count exceeded");
237-
if (compareAndSetState(current, next)) // 通过CAS修改
260+
// CAS修改state的值
261+
if (compareAndSetState(current, next))
238262
return true;
239263
}
240264
}
241265
```
242266

267+
可以看到,上面提到的几个方法底层基本都是通过同步器 `sync` 实现的。`Sync``CountDownLatch` 的内部类 , 继承了 `AbstractQueuedSynchronizer` ,重写了其中的某些方法。并且,Sync 对应的还有两个子类 `NonfairSync`(对应非公平模式) 和 `FairSync`(对应公平模式)。
268+
269+
```java
270+
private static final class Sync extends AbstractQueuedSynchronizer {
271+
// ...
272+
}
273+
static final class NonfairSync extends Sync {
274+
// ...
275+
}
276+
static final class FairSync extends Sync {
277+
// ...
278+
}
279+
```
280+
243281
#### 实战
244282

245283
```java
246-
/**
247-
*
248-
* @author Snailclimb
249-
* @date 2018年9月30日
250-
* @Description: 需要一次性拿一个许可的情况
251-
*/
252-
public class SemaphoreExample1 {
284+
public class SemaphoreExample {
253285
// 请求的数量
254286
private static final int threadCount = 550;
255287

@@ -299,7 +331,7 @@ semaphore.release(5);// 释放5个许可
299331

300332
[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645)
301333

302-
> `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state``permits`。当执行任务的线程数量超出 `permits`那么多余的线程将会被放入阻塞队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。
334+
> `Semaphore``CountDownLatch` 一样,也是共享锁的一种实现。它默认构造 AQS 的 `state``permits`。当执行任务的线程数量超出 `permits`那么多余的线程将会被放入等待队列 `Park`,并自旋判断 `state` 是否大于 0。只有当 `state` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 `release()` 方法,`release()` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。
303335
> 如此,每次只有最多不超过 `permits` 数量的线程能自旋成功,便限制了执行任务线程的数量。
304336
305337
### CountDownLatch (倒计时器)
@@ -312,7 +344,104 @@ semaphore.release(5);// 释放5个许可
312344

313345
#### 原理
314346

315-
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。当线程使用 `countDown()` 方法时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当调用 `await()` 方法的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 方法就会一直阻塞,也就是说 `await()` 方法之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
347+
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。这个我们通过 `CountDownLatch` 的构造方法即可看出。
348+
349+
```java
350+
public CountDownLatch(int count) {
351+
if (count < 0) throw new IllegalArgumentException("count < 0");
352+
this.sync = new Sync(count);
353+
}
354+
355+
private static final class Sync extends AbstractQueuedSynchronizer {
356+
Sync(int count) {
357+
setState(count);
358+
}
359+
//...
360+
}
361+
```
362+
363+
当线程调用 `countDown()` 时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当 `state` 为 0 时,表示所有的线程都调用了 `countDown` 方法,那么在 `CountDownLatch` 上等待的线程就会被唤醒并继续执行。
364+
365+
```java
366+
public void countDown() {
367+
// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
368+
sync.releaseShared(1);
369+
}
370+
```
371+
372+
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
373+
374+
```java
375+
// 释放共享锁
376+
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
377+
public final boolean releaseShared(int arg) {
378+
//释放共享锁
379+
if (tryReleaseShared(arg)) {
380+
//释放当前节点的后置等待节点
381+
doReleaseShared();
382+
return true;
383+
}
384+
return false;
385+
}
386+
```
387+
388+
`tryReleaseShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
389+
390+
```java
391+
// 对 state 进行递减,直到 state 变成 0;
392+
// 只有 count 递减到 0 时,countDown 才会返回 true
393+
protected boolean tryReleaseShared(int releases) {
394+
// 自选检查 state 是否为 0
395+
for (;;) {
396+
int c = getState();
397+
// 如果 state 已经是 0 了,直接返回 false
398+
if (c == 0)
399+
return false;
400+
// 对 state 进行递减
401+
int nextc = c-1;
402+
// CAS 操作更新 state 的值
403+
if (compareAndSetState(c, nextc))
404+
return nextc == 0;
405+
}
406+
}
407+
```
408+
409+
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
410+
411+
```java
412+
// 等待(也可以叫做加锁)
413+
public void await() throws InterruptedException {
414+
sync.acquireSharedInterruptibly(1);
415+
}
416+
// 带有超时时间的等待
417+
public boolean await(long timeout, TimeUnit unit)
418+
throws InterruptedException {
419+
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
420+
}
421+
```
422+
423+
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
424+
425+
```java
426+
// 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程
427+
public final void acquireSharedInterruptibly(int arg)
428+
throws InterruptedException {
429+
if (Thread.interrupted())
430+
throw new InterruptedException();
431+
// 尝试获得锁,获取成功则返回
432+
if (tryAcquireShared(arg) < 0)
433+
// 获取失败加入等待队列,挂起线程
434+
doAcquireSharedInterruptibly(arg);
435+
}
436+
```
437+
438+
`tryAcquireShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法,其作用就是判断 `state` 的值是否为 0,是的话就返回 1,否则返回 -1。
439+
440+
```java
441+
protected int tryAcquireShared(int acquires) {
442+
return (getState() == 0) ? 1 : -1;
443+
}
444+
```
316445

317446
#### 实战
318447

@@ -324,30 +453,25 @@ semaphore.release(5);// 释放5个许可
324453
**CountDownLatch 代码示例**
325454

326455
```java
327-
/**
328-
*
329-
* @author SnailClimb
330-
* @date 2018年10月1日
331-
* @Description: CountDownLatch 使用方法示例
332-
*/
333-
public class CountDownLatchExample1 {
456+
public class CountDownLatchExample {
334457
// 请求的数量
335-
private static final int threadCount = 550;
458+
private static final int THREAD_COUNT = 550;
336459

337460
public static void main(String[] args) throws InterruptedException {
338461
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
462+
// 只是测试使用,实际场景请手动赋值线程池参数
339463
ExecutorService threadPool = Executors.newFixedThreadPool(300);
340-
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
341-
for (int i = 0; i < threadCount; i++) {
342-
final int threadnum = i;
343-
threadPool.execute(() -> {// Lambda 表达式的运用
464+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
465+
for (int i = 0; i < THREAD_COUNT; i++) {
466+
final int threadNum = i;
467+
threadPool.execute(() -> {
344468
try {
345-
test(threadnum);
469+
test(threadNum);
346470
} catch (InterruptedException e) {
347-
// TODO Auto-generated catch block
348471
e.printStackTrace();
349472
} finally {
350-
countDownLatch.countDown();// 表示一个请求已经被完成
473+
// 表示一个请求已经被完成
474+
countDownLatch.countDown();
351475
}
352476

353477
});
@@ -358,12 +482,11 @@ public class CountDownLatchExample1 {
358482
}
359483

360484
public static void test(int threadnum) throws InterruptedException {
361-
Thread.sleep(1000);// 模拟请求的耗时操作
362-
System.out.println("threadnum:" + threadnum);
363-
Thread.sleep(1000);// 模拟请求的耗时操作
485+
Thread.sleep(1000);
486+
System.out.println("threadNum:" + threadnum);
487+
Thread.sleep(1000);
364488
}
365489
}
366-
367490
```
368491

369492
上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行`System.out.println("finish");`
@@ -521,12 +644,6 @@ public int await() throws InterruptedException, BrokenBarrierException {
521644
示例 1:
522645

523646
```java
524-
/**
525-
*
526-
* @author Snailclimb
527-
* @date 2018年10月1日
528-
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
529-
*/
530647
public class CyclicBarrierExample1 {
531648
// 请求的数量
532649
private static final int threadCount = 550;
@@ -602,12 +719,6 @@ threadnum:6is finish
602719
示例 2:
603720

604721
```java
605-
/**
606-
*
607-
* @author SnailClimb
608-
* @date 2018年10月1日
609-
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
610-
*/
611722
public class CyclicBarrierExample2 {
612723
// 请求的数量
613724
private static final int threadCount = 550;

0 commit comments

Comments
 (0)