From 7a9e532e86bf324d66f1e646f5135bffb2303006 Mon Sep 17 00:00:00 2001 From: tim_zhangyu Date: Thu, 23 May 2024 17:02:54 +0800 Subject: [PATCH 1/2] =?UTF-8?q?1.=20=E5=B0=86=E6=8B=92=E7=BB=9D=E7=AD=96?= =?UTF-8?q?=E7=95=A5=E5=A4=84=E7=9A=84=E6=BC=94=E7=A4=BA=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=E5=92=8C=E8=A7=84=E8=8C=83=202.=20=E5=8E=BB?= =?UTF-8?q?=E9=99=A4DelayedWorkQueue=E4=B8=AD=E6=96=87=E5=90=8D=E5=AD=97?= =?UTF-8?q?=E4=B8=AD=E7=9A=84"=E9=98=BB=E5=A1=9E"=E4=BA=8C=E5=AD=97?= =?UTF-8?q?=EF=BC=8C=E5=9B=A0=E4=B8=BA=E5=85=B6=E8=87=AA=E5=8A=A8=E6=89=A9?= =?UTF-8?q?=E5=AE=B9=E6=9C=BA=E5=88=B6=E7=9A=84=E5=AD=98=E5=9C=A8=EF=BC=8C?= =?UTF-8?q?DelayedWorkQueue=E4=B8=8D=E4=BC=9A=E9=98=BB=E5=A1=9E=E3=80=82?= =?UTF-8?q?=203.=20=E4=BF=AE=E6=94=B9DelayedWorkQueue=E6=89=A9=E5=AE=B9?= =?UTF-8?q?=E7=9A=84=E8=A1=A8=E8=BE=BE=EF=BC=8C=E9=81=BF=E5=85=8D=E6=AD=A7?= =?UTF-8?q?=E4=B9=89=E3=80=82=204.=20=E5=9C=A8=E5=B8=B8=E8=A7=81=E9=98=BB?= =?UTF-8?q?=E5=A1=9E=E9=98=9F=E5=88=97=E4=B8=AD=E6=96=B0=E5=A2=9EArrayBloc?= =?UTF-8?q?kingQueue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java-concurrent-questions-03.md | 91 +++++++++++-------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/docs/java/concurrent/java-concurrent-questions-03.md b/docs/java/concurrent/java-concurrent-questions-03.md index e9ce980b2f4..522c8b91a39 100644 --- a/docs/java/concurrent/java-concurrent-questions-03.md +++ b/docs/java/concurrent/java-concurrent-questions-03.md @@ -375,45 +375,55 @@ public static class CallerRunsPolicy implements RejectedExecutionHandler { 这里简单举一个例子,该线程池限定了最大线程数为 2,还阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略),`ThreadUtil`为 Hutool 提供的工具类: ```java -Logger log = LoggerFactory.getLogger(ThreadPoolTest.class); -// 创建一个线程池,核心线程数为1,最大线程数为2 -// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒, -// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。 -ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, - 2, - 60, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1), - new ThreadPoolExecutor.CallerRunsPolicy()); - -// 提交第一个任务,由核心线程执行 -threadPoolExecutor.execute(() -> { - log.info("核心线程执行第一个任务"); - ThreadUtil.sleep(1, TimeUnit.MINUTES); -}); - -// 提交第二个任务,由于核心线程被占用,任务将进入队列等待 -threadPoolExecutor.execute(() -> { - log.info("非核心线程处理入队的第二个任务"); - ThreadUtil.sleep(1, TimeUnit.MINUTES); -}); - -// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理 -threadPoolExecutor.execute(() -> { - log.info("非核心线程处理第三个任务"); - ThreadUtil.sleep(1, TimeUnit.MINUTES); -}); - -// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行 -threadPoolExecutor.execute(() -> { - log.info("主线程处理第四个任务"); - ThreadUtil.sleep(2, TimeUnit.MINUTES); -}); - -// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交 -threadPoolExecutor.execute(() -> { - log.info("核心线程执行第五个任务"); -}); +public class ThreadPoolTest { + + private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class); + + public static void main(String[] args) { + // 创建一个线程池,核心线程数为1,最大线程数为2 + // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒, + // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。 + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, + 2, + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1), + new ThreadPoolExecutor.CallerRunsPolicy()); + + // 提交第一个任务,由核心线程执行 + threadPoolExecutor.execute(() -> { + log.info("核心线程执行第一个任务"); + ThreadUtil.sleep(1, TimeUnit.MINUTES); + }); + + // 提交第二个任务,由于核心线程被占用,任务将进入队列等待 + threadPoolExecutor.execute(() -> { + log.info("非核心线程处理入队的第二个任务"); + ThreadUtil.sleep(1, TimeUnit.MINUTES); + }); + + // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理 + threadPoolExecutor.execute(() -> { + log.info("非核心线程处理第三个任务"); + ThreadUtil.sleep(1, TimeUnit.MINUTES); + }); + + // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行 + threadPoolExecutor.execute(() -> { + log.info("主线程处理第四个任务"); + ThreadUtil.sleep(2, TimeUnit.MINUTES); + }); + + // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交 + threadPoolExecutor.execute(() -> { + log.info("核心线程执行第五个任务"); + }); + + // 关闭线程池 + threadPoolExecutor.shutdown(); + } +} + ``` 输出: @@ -496,7 +506,8 @@ new RejectedExecutionHandler() { - 容量为 `Integer.MAX_VALUE` 的 `LinkedBlockingQueue`(无界队列):`FixedThreadPool` 和 `SingleThreadExector` 。`FixedThreadPool`最多只能创建核心线程数的线程(核心线程数和最大线程数相等),`SingleThreadExector`只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 - `SynchronousQueue`(同步队列):`CachedThreadPool` 。`SynchronousQueue` 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,`CachedThreadPool` 的最大线程数是 `Integer.MAX_VALUE` ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。 -- `DelayedWorkQueue`(延迟阻塞队列):`ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` 。`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。 +- `DelayedWorkQueue`(延迟队列):`ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` 。`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。 +- `ArrayBlockingQueue`(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改。 ### 线程池处理任务的流程了解吗? From 9e67715cd73db686fd365718be67c76f19c4c86f Mon Sep 17 00:00:00 2001 From: tim_zhangyu Date: Fri, 24 May 2024 19:05:41 +0800 Subject: [PATCH 2/2] =?UTF-8?q?1.=20=E4=BF=AE=E6=AD=A3=E4=BA=86=E5=85=B3?= =?UTF-8?q?=E4=BA=8E=20LinkedBlockingQueue=20=E7=9A=84=E6=8F=8F=E8=BF=B0?= =?UTF-8?q?=E9=94=99=E8=AF=AF=EF=BC=8C=E6=8C=87=E5=87=BA=20LinkedBlockingQ?= =?UTF-8?q?ueue=20=E6=98=AF=E6=9C=89=E7=95=8C=E9=98=BB=E5=A1=9E=E9=98=9F?= =?UTF-8?q?=E5=88=97=EF=BC=8C=E5=85=B6=E9=BB=98=E8=AE=A4=E9=95=BF=E5=BA=A6?= =?UTF-8?q?=E4=B8=BA=20Integer.MAX=5FVALUE=20=E8=80=8C=E9=9D=9E=E6=97=A0?= =?UTF-8?q?=E7=95=8C=E9=98=9F=E5=88=97=E3=80=82=202.=20=E4=BF=AE=E6=AD=A3?= =?UTF-8?q?=E5=8D=95=E8=AF=8D=E9=94=99=E8=AF=AF=EF=BC=8CSingleThreadExecto?= =?UTF-8?q?r=20=E4=BF=AE=E6=AD=A3=E4=B8=BA=20SingleThreadExecutor=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/java/concurrent/java-concurrent-questions-03.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/java/concurrent/java-concurrent-questions-03.md b/docs/java/concurrent/java-concurrent-questions-03.md index 522c8b91a39..80ff3eb1278 100644 --- a/docs/java/concurrent/java-concurrent-questions-03.md +++ b/docs/java/concurrent/java-concurrent-questions-03.md @@ -234,12 +234,12 @@ static class Entry extends WeakReference> { `Executors` 返回线程池对象的弊端如下: -- `FixedThreadPool` 和 `SingleThreadExecutor`:使用的是无界的 `LinkedBlockingQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。 +- `FixedThreadPool` 和 `SingleThreadExecutor`:使用的是有界阻塞队列是 `LinkedBlockingQueue` ,其任务队列的最大长度为 `Integer.MAX_VALUE` ,可能堆积大量的请求,从而导致 OOM。 - `CachedThreadPool`:使用的是同步队列 `SynchronousQueue`, 允许创建的线程数量为 `Integer.MAX_VALUE` ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。 -- `ScheduledThreadPool` 和 `SingleThreadScheduledExecutor`:使用的无界的延迟阻塞队列`DelayedWorkQueue`,任务队列最大长度为 `Integer.MAX_VALUE`,可能堆积大量的请求,从而导致 OOM。 +- `ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` :使用的无界的延迟阻塞队列 `DelayedWorkQueue` ,任务队列最大长度为 `Integer.MAX_VALUE` ,可能堆积大量的请求,从而导致 OOM。 ```java -// 无界队列 LinkedBlockingQueue +// 有界队列 LinkedBlockingQueue public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); @@ -504,7 +504,7 @@ new RejectedExecutionHandler() { 不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。 -- 容量为 `Integer.MAX_VALUE` 的 `LinkedBlockingQueue`(无界队列):`FixedThreadPool` 和 `SingleThreadExector` 。`FixedThreadPool`最多只能创建核心线程数的线程(核心线程数和最大线程数相等),`SingleThreadExector`只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 +- 容量为 `Integer.MAX_VALUE` 的 `LinkedBlockingQueue`(有界阻塞队列):`FixedThreadPool` 和 `SingleThreadExecutor` 。`FixedThreadPool`最多只能创建核心线程数的线程(核心线程数和最大线程数相等),`SingleThreadExecutor`只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。 - `SynchronousQueue`(同步队列):`CachedThreadPool` 。`SynchronousQueue` 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,`CachedThreadPool` 的最大线程数是 `Integer.MAX_VALUE` ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。 - `DelayedWorkQueue`(延迟队列):`ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` 。`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。 - `ArrayBlockingQueue`(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改。 @@ -655,7 +655,7 @@ CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内 这是一个常见的面试问题,本质其实还是在考察求职者对于线程池以及阻塞队列的掌握。 -我们上面也提到了,不同的线程池会选用不同的阻塞队列作为任务队列,比如`FixedThreadPool` 使用的是`LinkedBlockingQueue`(无界队列),由于队列永远不会被放满,因此`FixedThreadPool`最多只能创建核心线程数的线程。 +我们上面也提到了,不同的线程池会选用不同的阻塞队列作为任务队列,比如`FixedThreadPool` 使用的是`LinkedBlockingQueue`(有界队列),默认构造器初始的队列长度为 `Integer.MAX_VALUE` ,由于队列永远不会被放满,因此`FixedThreadPool`最多只能创建核心线程数的线程。 假如我们需要实现一个优先级任务线程池的话,那可以考虑使用 `PriorityBlockingQueue` (优先级阻塞队列)作为任务队列(`ThreadPoolExecutor` 的构造函数有一个 `workQueue` 参数可以传入任务队列)。