@@ -40,7 +40,7 @@ AQS(`AbstractQueuedSynchronizer`)的核心原理图:
4040
4141![ CLH 队列] ( https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png )
4242
43- AQS 使用 ** int 成员变量 ` state ` 表示同步状态** ,通过内置的 ** FIFO 线程等待/同步队列 ** 来完成获取资源线程的排队工作。
43+ AQS 使用 ** int 成员变量 ` state ` 表示同步状态** ,通过内置的 ** FIFO 线程等待/等待队列 ** 来完成获取资源线程的排队工作。
4444
4545` state ` 变量由 ` volatile ` 修饰,用于展示当前临界资源的获锁情况。
4646
@@ -120,9 +120,9 @@ protected boolean isHeldExclusively()
120120
121121` synchronized ` 和 ` ReentrantLock ` 都是一次只允许一个线程访问某个资源,而` Semaphore ` (信号量)可以用来控制同时访问特定资源的线程数量。
122122
123- Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 ` Semaphore ` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
123+ ` Semaphore ` 的使用简单,我们这里假设有 ` N(N>5) ` 个线程来获取 ` Semaphore ` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
124124
125- ``` java
125+ ``` java
126126// 初始共享资源数量
127127final Semaphore semaphore = new Semaphore (5 );
128128// 获取1个许可
@@ -158,41 +158,46 @@ public Semaphore(int permits, boolean fair) {
158158
159159` Semaphore ` 是共享锁的一种实现,它默认构造 AQS 的 ` state ` 值为 ` permits ` ,你可以将 ` permits ` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
160160
161- 调用` semaphore.acquire() ` ,线程尝试获取许可证,如果 ` state > 0 ` 的话,则表示可以获取成功,如果 ` state <= 0 ` 的话,则表示许可证数量不足,获取失败。
161+ 以无参 ` acquire ` 方法为例, 调用` semaphore.acquire() ` ,线程尝试获取许可证,如果 ` state > 0 ` 的话,则表示可以获取成功,如果 ` state <= 0 ` 的话,则表示许可证数量不足,获取失败。
162162
163- 如果可以获取成功的话(` state > 0 ` ),会尝试使用 CAS 操作去修改 ` state ` 的值 ` state=state-1 ` 。如果获取失败则会创建一个 Node 节点加入阻塞队列 ,挂起当前线程。
163+ 如果可以获取成功的话(` state > 0 ` ),会尝试使用 CAS 操作去修改 ` state ` 的值 ` state=state-1 ` 。如果获取失败则会创建一个 Node 节点加入等待队列 ,挂起当前线程。
164164
165165``` java
166- /**
167- * 获取1个许可证
168- */
166+ // 获取1个许可证
169167public void acquire() throws InterruptedException {
170168 sync. acquireSharedInterruptibly(1 );
171169}
172- /**
173- * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
174- */
170+
171+ // 获取一个或者多个许可证
172+ public void acquire(int permits) throws InterruptedException {
173+ if (permits < 0 ) throw new IllegalArgumentException ();
174+ sync. acquireSharedInterruptibly(permits);
175+ }
176+ ```
177+
178+ ` acquireSharedInterruptibly ` 方法是 ` AbstractQueuedSynchronizer ` 中的默认实现。
179+
180+ ``` java
181+ // 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
175182public final void acquireSharedInterruptibly(int arg)
176183 throws InterruptedException {
177184 if (Thread . interrupted())
178185 throw new InterruptedException ();
179- // 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入阻塞队列 ,挂起当前线程。
186+ // 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列 ,挂起当前线程。
180187 if (tryAcquireShared(arg) < 0 )
181188 doAcquireSharedInterruptibly(arg);
182189}
183- /**
184- * 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
185- * 1、获取失败,返回负值
186- * 2、共享模式下获取成功,但后续的共享模式获取会失败,返回0
187- * 3、共享模式获取成功,随后的共享模式也可能获取成功,返回正值
188- */
190+ ```
191+
192+ 这里再以非公平模式( ` NonfairSync ` )的为例,看看 ` tryAcquireShared ` 方法的实现。
193+
194+ ``` java
195+ // 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
189196protected int tryAcquireShared(int acquires) {
190197 return nonfairTryAcquireShared(acquires);
191198}
192- /**
193- * 非公平的共享模式获取许可证,acquires为许可证数量,根据代码上下文可知该值总是为1
194- * 注:公平模式的实现会先判断队列中是否有节点在排队,有则直接返回-1,表示获取失败,没有则执行下面的操作
195- */
199+
200+ // 非公平的共享模式获取许可证
196201final int nonfairTryAcquireShared(int acquires) {
197202 for (;;) {
198203 // 当前可用许可证数量
@@ -209,47 +214,74 @@ final int nonfairTryAcquireShared(int acquires) {
209214}
210215```
211216
212- 调用` semaphore.release(); ` ,线程尝试释放许可证,并使用 CAS 操作去修改 ` state ` 的值 ` state=state+1 ` 。释放许可证成功之后,同时会唤醒同步队列中的一个线程 。被唤醒的线程会重新尝试去修改 ` state ` 的值 ` state=state-1 ` ,如果 ` state > 0 ` 则获取令牌成功,否则重新进入阻塞队列 ,挂起线程。
217+ 以无参 ` release ` 方法为例, 调用` semaphore.release(); ` ,线程尝试释放许可证,并使用 CAS 操作去修改 ` state ` 的值 ` state=state+1 ` 。释放许可证成功之后,同时会唤醒等待队列中的一个线程 。被唤醒的线程会重新尝试去修改 ` state ` 的值 ` state=state-1 ` ,如果 ` state > 0 ` 则获取令牌成功,否则重新进入等待队列 ,挂起线程。
213218
214219``` java
215220// 释放一个许可证
216221public void release() {
217222 sync. releaseShared(1 );
218223}
219224
220- // 释放共享锁,同时会唤醒同步队列中的一个线程。
225+ // 释放一个或者多个许可证
226+ public void release(int permits) {
227+ if (permits < 0 ) throw new IllegalArgumentException ();
228+ sync. releaseShared(permits);
229+ }
230+ ```
231+
232+ ` releaseShared ` 方法是 ` AbstractQueuedSynchronizer ` 中的默认实现。
233+
234+ ``` java
235+ // 释放共享锁
236+ // 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
221237public final boolean releaseShared(int arg) {
222238 // 释放共享锁
223239 if (tryReleaseShared(arg)) {
224- // 唤醒同步队列中的一个线程
240+ // 释放当前节点的后置等待节点
225241 doReleaseShared();
226242 return true ;
227243 }
228244 return false ;
229245}
246+ ```
247+
248+ ` tryReleaseShared ` 方法是` Semaphore ` 的内部类 ` Sync ` 重写的一个方法, ` AbstractQueuedSynchronizer ` 中的默认实现仅仅抛出 ` UnsupportedOperationException ` 异常。
249+
250+ ``` java
251+ // 内部类 Sync 中重写的一个方法
230252// 尝试释放资源
231253protected final boolean tryReleaseShared(int releases) {
232254 for (;;) {
233255 int current = getState();
234- int next = current + releases; // 可用许可证+1
256+ // 可用许可证+1
257+ int next = current + releases;
235258 if (next < current) // overflow
236259 throw new Error (" Maximum permit count exceeded" );
237- if (compareAndSetState(current, next)) // 通过CAS修改
260+ // CAS修改state的值
261+ if (compareAndSetState(current, next))
238262 return true ;
239263 }
240264}
241265```
242266
267+ 可以看到,上面提到的几个方法底层基本都是通过同步器 ` sync ` 实现的。` Sync ` 是 ` CountDownLatch ` 的内部类 , 继承了 ` AbstractQueuedSynchronizer ` ,重写了其中的某些方法。并且,Sync 对应的还有两个子类 ` NonfairSync ` (对应非公平模式) 和 ` FairSync ` (对应公平模式)。
268+
269+ ``` java
270+ private static final class Sync extends AbstractQueuedSynchronizer {
271+ // ...
272+ }
273+ static final class NonfairSync extends Sync {
274+ // ...
275+ }
276+ static final class FairSync extends Sync {
277+ // ...
278+ }
279+ ```
280+
243281#### 实战
244282
245283``` java
246- /**
247- *
248- * @author Snailclimb
249- * @date 2018年9月30日
250- * @Description: 需要一次性拿一个许可的情况
251- */
252- public class SemaphoreExample1 {
284+ public class SemaphoreExample {
253285 // 请求的数量
254286 private static final int threadCount = 550 ;
255287
@@ -299,7 +331,7 @@ semaphore.release(5);// 释放5个许可
299331
300332[ issue645 补充内容] ( https://github.com/Snailclimb/JavaGuide/issues/645 ) :
301333
302- > ` Semaphore ` 与 ` CountDownLatch ` 一样,也是共享锁的一种实现。它默认构造 AQS 的 ` state ` 为 ` permits ` 。当执行任务的线程数量超出 ` permits ` ,那么多余的线程将会被放入阻塞队列 ` Park ` ,并自旋判断 ` state ` 是否大于 0。只有当 ` state ` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 ` release() ` 方法,` release() ` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。
334+ > ` Semaphore ` 与 ` CountDownLatch ` 一样,也是共享锁的一种实现。它默认构造 AQS 的 ` state ` 为 ` permits ` 。当执行任务的线程数量超出 ` permits ` ,那么多余的线程将会被放入等待队列 ` Park ` ,并自旋判断 ` state ` 是否大于 0。只有当 ` state ` 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 ` release() ` 方法,` release() ` 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。
303335> 如此,每次只有最多不超过 ` permits ` 数量的线程能自旋成功,便限制了执行任务线程的数量。
304336
305337### CountDownLatch (倒计时器)
@@ -312,7 +344,104 @@ semaphore.release(5);// 释放5个许可
312344
313345#### 原理
314346
315- ` CountDownLatch ` 是共享锁的一种实现,它默认构造 AQS 的 ` state ` 值为 ` count ` 。当线程使用 ` countDown() ` 方法时,其实使用了` tryReleaseShared ` 方法以 CAS 的操作来减少 ` state ` ,直至 ` state ` 为 0 。当调用 ` await() ` 方法的时候,如果 ` state ` 不为 0,那就证明任务还没有执行完毕,` await() ` 方法就会一直阻塞,也就是说 ` await() ` 方法之后的语句不会被执行(` main ` 线程被加入到等待队列也就是 CLH 队列中了)。然后,` CountDownLatch ` 会自旋 CAS 判断 ` state == 0 ` ,如果 ` state == 0 ` 的话,就会释放所有等待的线程,` await() ` 方法之后的语句得到执行。
347+ ` CountDownLatch ` 是共享锁的一种实现,它默认构造 AQS 的 ` state ` 值为 ` count ` 。这个我们通过 ` CountDownLatch ` 的构造方法即可看出。
348+
349+ ``` java
350+ public CountDownLatch(int count) {
351+ if (count < 0 ) throw new IllegalArgumentException (" count < 0" );
352+ this . sync = new Sync (count);
353+ }
354+
355+ private static final class Sync extends AbstractQueuedSynchronizer {
356+ Sync (int count ) {
357+ setState(count);
358+ }
359+ // ...
360+ }
361+ ```
362+
363+ 当线程调用 ` countDown() ` 时,其实使用了` tryReleaseShared ` 方法以 CAS 的操作来减少 ` state ` ,直至 ` state ` 为 0 。当 ` state ` 为 0 时,表示所有的线程都调用了 ` countDown ` 方法,那么在 ` CountDownLatch ` 上等待的线程就会被唤醒并继续执行。
364+
365+ ``` java
366+ public void countDown() {
367+ // Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
368+ sync. releaseShared(1 );
369+ }
370+ ```
371+
372+ ` releaseShared ` 方法是 ` AbstractQueuedSynchronizer ` 中的默认实现。
373+
374+ ``` java
375+ // 释放共享锁
376+ // 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
377+ public final boolean releaseShared(int arg) {
378+ // 释放共享锁
379+ if (tryReleaseShared(arg)) {
380+ // 释放当前节点的后置等待节点
381+ doReleaseShared();
382+ return true ;
383+ }
384+ return false ;
385+ }
386+ ```
387+
388+ ` tryReleaseShared ` 方法是` CountDownLatch ` 的内部类 ` Sync ` 重写的一个方法, ` AbstractQueuedSynchronizer ` 中的默认实现仅仅抛出 ` UnsupportedOperationException ` 异常。
389+
390+ ``` java
391+ // 对 state 进行递减,直到 state 变成 0;
392+ // 只有 count 递减到 0 时,countDown 才会返回 true
393+ protected boolean tryReleaseShared(int releases) {
394+ // 自选检查 state 是否为 0
395+ for (;;) {
396+ int c = getState();
397+ // 如果 state 已经是 0 了,直接返回 false
398+ if (c == 0 )
399+ return false ;
400+ // 对 state 进行递减
401+ int nextc = c- 1 ;
402+ // CAS 操作更新 state 的值
403+ if (compareAndSetState(c, nextc))
404+ return nextc == 0 ;
405+ }
406+ }
407+ ```
408+
409+ 以无参 ` await ` 方法为例,当调用 ` await() ` 的时候,如果 ` state ` 不为 0,那就证明任务还没有执行完毕,` await() ` 就会一直阻塞,也就是说 ` await() ` 之后的语句不会被执行(` main ` 线程被加入到等待队列也就是 CLH 队列中了)。然后,` CountDownLatch ` 会自旋 CAS 判断 ` state == 0 ` ,如果 ` state == 0 ` 的话,就会释放所有等待的线程,` await() ` 方法之后的语句得到执行。
410+
411+ ``` java
412+ // 等待(也可以叫做加锁)
413+ public void await() throws InterruptedException {
414+ sync. acquireSharedInterruptibly(1 );
415+ }
416+ // 带有超时时间的等待
417+ public boolean await(long timeout, TimeUnit unit)
418+ throws InterruptedException {
419+ return sync. tryAcquireSharedNanos(1 , unit. toNanos(timeout));
420+ }
421+ ```
422+
423+ ` acquireSharedInterruptibly ` 方法是 ` AbstractQueuedSynchronizer ` 中的默认实现。
424+
425+ ``` java
426+ // 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程
427+ public final void acquireSharedInterruptibly(int arg)
428+ throws InterruptedException {
429+ if (Thread . interrupted())
430+ throw new InterruptedException ();
431+ // 尝试获得锁,获取成功则返回
432+ if (tryAcquireShared(arg) < 0 )
433+ // 获取失败加入等待队列,挂起线程
434+ doAcquireSharedInterruptibly(arg);
435+ }
436+ ```
437+
438+ ` tryAcquireShared ` 方法是` CountDownLatch ` 的内部类 ` Sync ` 重写的一个方法,其作用就是判断 ` state ` 的值是否为 0,是的话就返回 1,否则返回 -1。
439+
440+ ``` java
441+ protected int tryAcquireShared(int acquires) {
442+ return (getState() == 0 ) ? 1 : - 1 ;
443+ }
444+ ```
316445
317446#### 实战
318447
@@ -324,30 +453,25 @@ semaphore.release(5);// 释放5个许可
324453** CountDownLatch 代码示例** :
325454
326455``` java
327- /**
328- *
329- * @author SnailClimb
330- * @date 2018年10月1日
331- * @Description: CountDownLatch 使用方法示例
332- */
333- public class CountDownLatchExample1 {
456+ public class CountDownLatchExample {
334457 // 请求的数量
335- private static final int threadCount = 550 ;
458+ private static final int THREAD_COUNT = 550 ;
336459
337460 public static void main (String [] args ) throws InterruptedException {
338461 // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
462+ // 只是测试使用,实际场景请手动赋值线程池参数
339463 ExecutorService threadPool = Executors . newFixedThreadPool(300 );
340- final CountDownLatch countDownLatch = new CountDownLatch (threadCount );
341- for (int i = 0 ; i < threadCount ; i++ ) {
342- final int threadnum = i;
343- threadPool. execute(() - > {// Lambda 表达式的运用
464+ final CountDownLatch countDownLatch = new CountDownLatch (THREAD_COUNT );
465+ for (int i = 0 ; i < THREAD_COUNT ; i++ ) {
466+ final int threadNum = i;
467+ threadPool. execute(() - > {
344468 try {
345- test(threadnum );
469+ test(threadNum );
346470 } catch (InterruptedException e) {
347- // TODO Auto-generated catch block
348471 e. printStackTrace();
349472 } finally {
350- countDownLatch. countDown();// 表示一个请求已经被完成
473+ // 表示一个请求已经被完成
474+ countDownLatch. countDown();
351475 }
352476
353477 });
@@ -358,12 +482,11 @@ public class CountDownLatchExample1 {
358482 }
359483
360484 public static void test (int threadnum ) throws InterruptedException {
361- Thread . sleep(1000 );// 模拟请求的耗时操作
362- System . out. println(" threadnum :" + threadnum);
363- Thread . sleep(1000 );// 模拟请求的耗时操作
485+ Thread . sleep(1000 );
486+ System . out. println(" threadNum :" + threadnum);
487+ Thread . sleep(1000 );
364488 }
365489}
366-
367490```
368491
369492上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行` System.out.println("finish"); ` 。
@@ -521,12 +644,6 @@ public int await() throws InterruptedException, BrokenBarrierException {
521644示例 1:
522645
523646``` java
524- /**
525- *
526- * @author Snailclimb
527- * @date 2018年10月1日
528- * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
529- */
530647public class CyclicBarrierExample1 {
531648 // 请求的数量
532649 private static final int threadCount = 550 ;
@@ -602,12 +719,6 @@ threadnum:6is finish
602719示例 2:
603720
604721``` java
605- /**
606- *
607- * @author SnailClimb
608- * @date 2018年10月1日
609- * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
610- */
611722public class CyclicBarrierExample2 {
612723 // 请求的数量
613724 private static final int threadCount = 550 ;
0 commit comments