Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 51 additions & 40 deletions docs/java/concurrent/java-concurrent-questions-03.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,45 +375,55 @@ public static class CallerRunsPolicy implements RejectedExecutionHandler {
这里简单举一个例子,该线程池限定了最大线程数为 2,还阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略),`ThreadUtil`为 Hutool 提供的工具类:

```java
Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
// 创建一个线程池,核心线程数为1,最大线程数为2
// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());

// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第一个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第二个任务,由于核心线程被占用,任务将进入队列等待
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理入队的第二个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理第三个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
threadPoolExecutor.execute(() -> {
log.info("主线程处理第四个任务");
ThreadUtil.sleep(2, TimeUnit.MINUTES);
});

// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第五个任务");
});
public class ThreadPoolTest {

private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);

public static void main(String[] args) {
// 创建一个线程池,核心线程数为1,最大线程数为2
// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());

// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第一个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第二个任务,由于核心线程被占用,任务将进入队列等待
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理入队的第二个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理第三个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
threadPoolExecutor.execute(() -> {
log.info("主线程处理第四个任务");
ThreadUtil.sleep(2, TimeUnit.MINUTES);
});

// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第五个任务");
});

// 关闭线程池
threadPoolExecutor.shutdown();
}
}

```

输出:
Expand Down Expand Up @@ -496,7 +506,8 @@ new RejectedExecutionHandler() {

- 容量为 `Integer.MAX_VALUE` 的 `LinkedBlockingQueue`(无界队列):`FixedThreadPool` 和 `SingleThreadExector` 。`FixedThreadPool`最多只能创建核心线程数的线程(核心线程数和最大线程数相等),`SingleThreadExector`只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
- `SynchronousQueue`(同步队列):`CachedThreadPool` 。`SynchronousQueue` 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,`CachedThreadPool` 的最大线程数是 `Integer.MAX_VALUE` ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
- `DelayedWorkQueue`(延迟阻塞队列):`ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` 。`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。
- `DelayedWorkQueue`(延迟队列):`ScheduledThreadPool` 和 `SingleThreadScheduledExecutor` 。`DelayedWorkQueue` 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。`DelayedWorkQueue` 添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达 `Integer.MAX_VALUE`,所以最多只能创建核心线程数的线程。
- `ArrayBlockingQueue`(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改。

### 线程池处理任务的流程了解吗?

Expand Down