From 24f27e536629c4bca4cacf795666b883c961bcef Mon Sep 17 00:00:00 2001 From: anaer Date: Tue, 31 Aug 2021 14:37:58 +0800 Subject: [PATCH 1/8] =?UTF-8?q?Update=20RabbitMQ=E5=85=A5=E9=97=A8?= =?UTF-8?q?=E7=9C=8B=E8=BF=99=E4=B8=80=E7=AF=87=E5=B0=B1=E5=A4=9F=E4=BA=86?= =?UTF-8?q?.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit typo --- ...07\345\260\261\345\244\237\344\272\206.md" | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git "a/docs/system-design/distributed-system/message-queue/RabbitMQ\345\205\245\351\227\250\347\234\213\350\277\231\344\270\200\347\257\207\345\260\261\345\244\237\344\272\206.md" "b/docs/system-design/distributed-system/message-queue/RabbitMQ\345\205\245\351\227\250\347\234\213\350\277\231\344\270\200\347\257\207\345\260\261\345\244\237\344\272\206.md" index fb6beae75bf..38444481507 100644 --- "a/docs/system-design/distributed-system/message-queue/RabbitMQ\345\205\245\351\227\250\347\234\213\350\277\231\344\270\200\347\257\207\345\260\261\345\244\237\344\272\206.md" +++ "b/docs/system-design/distributed-system/message-queue/RabbitMQ\345\205\245\351\227\250\347\234\213\350\277\231\344\270\200\347\257\207\345\260\261\345\244\237\344\272\206.md" @@ -32,7 +32,7 @@ RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol, RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ 的具体特点可以概括为以下几点: - **可靠性:** RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。 -- **灵活的路由:** 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们将 RabbitMQ 核心概念的时候详细介绍到。 +- **灵活的路由:** 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们讲 RabbitMQ 核心概念的时候详细介绍到。 - **扩展性:** 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。 - **高可用性:** 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。 - **支持多种协议:** RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。 @@ -85,7 +85,7 @@ Binding(绑定) 示意图: **RabbitMQ** 中消息只能存储在 **队列** 中,这一点和 **Kafka** 这种消息中间件相反。Kafka 将消息存储在 **topic(主题)** 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。 -**多个消费者可以订阅同一个队列**,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免的消息被重复消费。 +**多个消费者可以订阅同一个队列**,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。 **RabbitMQ** 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。 @@ -129,17 +129,17 @@ direct 类型常用在处理有优先级的任务,根据任务的优先级把 以上图为例: -- 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queuel 和 Queue2; +- 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2; - 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中; - 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中; -- 路由键为 “java.rabbitmq.demo” 的消息只会路由到Queuel中; +- 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中; - 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。 ##### ④ headers(不推荐) -headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式)'对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。 +headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。 -## 二 安装 RabbitMq +## 二 安装 RabbitMQ 通过 Docker 安装非常方便,只需要几条命令就好了,我这里是只说一下常规安装方法。 @@ -279,23 +279,23 @@ rabbitmq-plugins enable rabbitmq_management chkconfig rabbitmq-server on ``` -**4. 启动服务** +**5. 启动服务** ```shell service rabbitmq-server start ``` -**5. 查看服务状态** +**6. 查看服务状态** ```shell service rabbitmq-server status ``` -**6. 访问 RabbitMQ 控制台** +**7. 访问 RabbitMQ 控制台** 浏览器访问:http://你的ip地址:15672/ -默认用户名和密码: guest/guest;但是需要注意的是:guestuest用户只是被容许从localhost访问。官网文档描述如下: +默认用户名和密码:guest/guest; 但是需要注意的是:guest用户只是被容许从localhost访问。官网文档描述如下: ```shell “guest” user can only connect via localhost From bdcd1afe6da1f9c5352422e1c20dfcca8bc6933f Mon Sep 17 00:00:00 2001 From: anaer Date: Wed, 1 Sep 2021 10:32:50 +0800 Subject: [PATCH 2/8] =?UTF-8?q?Update=20IO=E6=A8=A1=E5=9E=8B.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit typo --- "docs/java/basis/IO\346\250\241\345\236\213.md" | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git "a/docs/java/basis/IO\346\250\241\345\236\213.md" "b/docs/java/basis/IO\346\250\241\345\236\213.md" index 730e1e24f06..6091a8fe4b5 100644 --- "a/docs/java/basis/IO\346\250\241\345\236\213.md" +++ "b/docs/java/basis/IO\346\250\241\345\236\213.md" @@ -18,7 +18,7 @@ I/O(**I**nput/**O**utpu) 即**输入/输出** 。 ![冯诺依曼体系结构](https://img-blog.csdnimg.cn/20190624122126398.jpeg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9pcy1jbG91ZC5ibG9nLmNzZG4ubmV0,size_16,color_FFFFFF,t_70) -输入设备(比如键盘)和输出设备(比如鼠标)都属于外部设备。网卡、硬盘这种既可以属于输入设备,也可以属于输出设备。 +输入设备(比如键盘)和输出设备(比如显示器)都属于外部设备。网卡、硬盘这种既可以属于输入设备,也可以属于输出设备。 输入设备向计算机输入数据,输出设备接收计算机输出的数据。 @@ -28,7 +28,7 @@ I/O(**I**nput/**O**utpu) 即**输入/输出** 。 根据大学里学到的操作系统相关的知识:为了保证操作系统的稳定性和安全性,一个进程的地址空间划分为 **用户空间(User space)** 和 **内核空间(Kernel space )** 。 -像我们平常运行的应用程序都是运行在用户空间,只有内核空间才能进行系统态级别的资源有关的操作,比如如文件管理、进程通信、内存管理等等。也就是说,我们想要进行 IO 操作,一定是要依赖内核空间的能力。 +像我们平常运行的应用程序都是运行在用户空间,只有内核空间才能进行系统态级别的资源有关的操作,比如文件管理、进程通信、内存管理等等。也就是说,我们想要进行 IO 操作,一定是要依赖内核空间的能力。 并且,用户空间的程序不能直接访问内核空间。 @@ -57,7 +57,7 @@ UNIX 系统下, IO 模型一共有 5 种: **同步阻塞 I/O**、**同步非 **BIO 属于同步阻塞 IO 模型** 。 -同步阻塞 IO 模型中,应用程序发起 read 调用后,会一直阻塞,直到在内核把数据拷贝到用户空间。 +同步阻塞 IO 模型中,应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间。 ![图源:《深入拆解Tomcat & Jetty》](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6a9e704af49b4380bb686f0c96d33b81~tplv-k3u1fbpfcp-watermark.image) From e534636ad9d64fd8497d677b9060aafc2aea4d27 Mon Sep 17 00:00:00 2001 From: anaer Date: Wed, 1 Sep 2021 10:54:55 +0800 Subject: [PATCH 3/8] =?UTF-8?q?Update=20=E4=BB=A3=E7=90=86=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E8=AF=A6=E8=A7=A3.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit typo --- ...346\250\241\345\274\217\350\257\246\350\247\243.md" | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git "a/docs/java/basis/\344\273\243\347\220\206\346\250\241\345\274\217\350\257\246\350\247\243.md" "b/docs/java/basis/\344\273\243\347\220\206\346\250\241\345\274\217\350\257\246\350\247\243.md" index 868bc012e88..8106d30503e 100644 --- "a/docs/java/basis/\344\273\243\347\220\206\346\250\241\345\274\217\350\257\246\350\247\243.md" +++ "b/docs/java/basis/\344\273\243\347\220\206\346\250\241\345\274\217\350\257\246\350\247\243.md" @@ -120,15 +120,15 @@ after method send() **从 JVM 角度来说,动态代理是在运行时动态生成类字节码,并加载到 JVM 中的。** -说到动态代理,Spring AOP、RPC 框架应该是两个不得不的提的,它们的实现都依赖了动态代理。 +说到动态代理,Spring AOP、RPC 框架应该是两个不得不提的,它们的实现都依赖了动态代理。 -**动态代理在我们日常开发中使用的相对较小,但是在框架中的几乎是必用的一门技术。学会了动态代理之后,对于我们理解和学习各种框架的原理也非常有帮助。** +**动态代理在我们日常开发中使用的相对较少,但是在框架中的几乎是必用的一门技术。学会了动态代理之后,对于我们理解和学习各种框架的原理也非常有帮助。** 就 Java 来说,动态代理的实现方式有很多种,比如 **JDK 动态代理**、**CGLIB 动态代理**等等。 [guide-rpc-framework](https://github.com/Snailclimb/guide-rpc-framework) 使用的是 JDK 动态代理,我们先来看看 JDK 动态代理的使用。 -另外,虽然 [guide-rpc-framework](https://github.com/Snailclimb/guide-rpc-framework) 没有用到 **CGLIB 动态代理 ,我们这里还是简单介绍一下其使用以及和**JDK 动态代理的对比。 +另外,虽然 [guide-rpc-framework](https://github.com/Snailclimb/guide-rpc-framework) 没有用到 **CGLIB 动态代理** ,我们这里还是简单介绍一下其使用以及和**JDK 动态代理**的对比。 ### 3.1. JDK 动态代理机制 @@ -154,7 +154,7 @@ after method send() 2. **interfaces** : 被代理类实现的一些接口; 3. **h** : 实现了 `InvocationHandler` 接口的对象; -要实现动态代理的话,还必须需要实现`InvocationHandler` 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时候,这个方法的调用就会被转发到实现`InvocationHandler` 接口类的 `invoke` 方法来调用。 +要实现动态代理的话,还必须需要实现`InvocationHandler` 来自定义处理逻辑。 当我们的动态代理对象调用一个方法时,这个方法的调用就会被转发到实现`InvocationHandler` 接口类的 `invoke` 方法来调用。 ```java public interface InvocationHandler { @@ -298,7 +298,7 @@ extends Callback{ 1. **obj** :被代理的对象(需要增强的对象) 2. **method** :被拦截的方法(需要增强的方法) 3. **args** :方法入参 -4. **methodProxy** :用于调用原始方法 +4. **proxy** :用于调用原始方法 你可以通过 `Enhancer`类来动态获取被代理类,当代理类调用方法的时候,实际调用的是 `MethodInterceptor` 中的 `intercept` 方法。 From 0222e9c40c35c62585af5e6b89ed6e8081ac600c Mon Sep 17 00:00:00 2001 From: anaer Date: Wed, 1 Sep 2021 11:20:27 +0800 Subject: [PATCH 4/8] =?UTF-8?q?Update=20BIO,NIO,AIO=E6=80=BB=E7=BB=93.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit typo --- "docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git "a/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" "b/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" index 50f6b7fec83..c26ed8f5e04 100644 --- "a/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" +++ "b/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" @@ -34,10 +34,10 @@ > When you execute something synchronously, you wait for it to finish before moving on to another task. When you execute something asynchronously, you can move on to another task before it finishes. > -> 当你同步执行某项任务时,你需要等待其完成才能继续执行其他任务。当你异步执行某些操作时,你可以在完成另一个任务之前继续进行。 +> 当你同步执行某项任务时,你需要等待其完成才能继续执行其他任务。当您异步执行某项任务时,你可以在它完成之前继续执行其他任务。 - **同步** :两个同步任务相互依赖,并且一个任务必须以依赖于另一任务的某种方式执行。 比如在`A->B`事件模型中,你需要先完成 A 才能执行B。 再换句话说,同步调用中被调用者未处理完请求之前,调用不返回,调用者会一直等待结果的返回。 -- **异步**: 两个异步的任务是完全独立的,一方的执行不需要等待另外一方的执行。再换句话说,异步调用中一调用就返回结果不需要等待结果返回,当结果返回的时候通过回调函数或者其他方式拿着结果再做相关事情, +- **异步**: 两个异步的任务是完全独立的,一方的执行不需要等待另外一方的执行。再换句话说,异步调用中一调用就返回结果不需要等待结果返回,当结果返回的时候通过回调函数或者其他方式拿着结果再做相关事情。 **阻塞和非阻塞** @@ -58,7 +58,7 @@ BIO通信(一请求一应答)模型图如下(图源网络,原出处不明) ![传统BIO通信模型图](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2.png) -采用 **BIO 通信模型** 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在`while(true)` 循环中服务端会调用 `accept()` 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,如上图所示。 +采用 **BIO 通信模型** 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在`while(true)` 循环中服务端会调用 `accept()` 方法等待接收客户端的连接的方式监听请求,一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,如上图所示。 如果要让 **BIO 通信模型** 能够同时处理多个客户端请求,就必须使用多线程(主要原因是`socket.accept()`、`socket.read()`、`socket.write()` 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 **一请求一应答通信模型** 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 **线程池机制** 改善,线程池还可以让线程的创建和回收成本相对较低。使用`FixedThreadPool` 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M),下面一节"伪异步 BIO"中会详细介绍到。 From ffe1c9157ade4e403f6c7df194e80f49cdd70951 Mon Sep 17 00:00:00 2001 From: anaer Date: Wed, 1 Sep 2021 18:00:48 +0800 Subject: [PATCH 5/8] =?UTF-8?q?Update=20BIO,NIO,AIO=E6=80=BB=E7=BB=93.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit typo --- "docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git "a/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" "b/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" index c26ed8f5e04..33a7ac50873 100644 --- "a/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" +++ "b/docs/java/basis/BIO,NIO,AIO\346\200\273\347\273\223.md" @@ -193,7 +193,7 @@ Java IO的各种流是阻塞的。这意味着,当一个线程调用 `read()` Buffer是一个对象,它包含一些要写入或者要读出的数据。在NIO类库中加入Buffer对象,体现了新库与原I/O的一个重要区别。在面向流的I/O中·可以将数据直接写入或者将数据直接读到 Stream 对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。 -在NIO厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。 +在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。 最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能用于操作 byte 数组。除了ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区。 From ce6daf112e212d6dd5993441614e865eef1167ff Mon Sep 17 00:00:00 2001 From: guide Date: Wed, 1 Sep 2021 18:41:05 +0800 Subject: [PATCH 6/8] =?UTF-8?q?Create=20CompletableFuture=E4=BB=8E?= =?UTF-8?q?=E5=85=A5=E9=97=A8=E5=88=B0=E5=AE=9E=E6=88=98.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...50\345\210\260\345\256\236\346\210\230.md" | 492 ++++++++++++++++++ 1 file changed, 492 insertions(+) create mode 100644 "docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" diff --git "a/docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" "b/docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" new file mode 100644 index 00000000000..485588bd20a --- /dev/null +++ "b/docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" @@ -0,0 +1,492 @@ +自己在项目中使用 `CompletableFuture` 比较多,看到很多开源框架中也大量使用到了 `CompletableFuture` 。 + +因此,专门写一篇文章来介绍这个 Java 8 才被引入的一个非常有用的用于异步编程的类。 + +## 简单介绍 + +`CompletableFuture` 同时实现了 `Future` 和 `CompletionStage` 接口。其除了提供了更为好用和强大的 `Future` 特性之外,还提供了函数式编程的能力。 + +```java +public class CompletableFuture implements Future, CompletionStage { +} +``` + +## 常见操作 + +### 创建 CompletableFuture + +常见的创建 `CompletableFuture` 对象的方法如下: + +1. 通过 new 关键字。 +2. 基于 `CompletableFuture` 自带的静态工厂方法:`runAsync()` 、`supplyAsync()` 。 + +#### new 关键字 + +通过 new 关键字创建 `CompletableFuture` 对象这种使用方式可以看作是将 `CompletableFuture` 当做 `Future` 来使用。 + +我在我的开源项目 [guide-rpc-framework](https://github.com/Snailclimb/guide-rpc-framework) 中就是这种方式创建的 `CompletableFuture` 对象。 + +下面咱们来看一个简单的案例。 + +我们通过创建了一个结果值类型为 `RpcResponse` 的 `CompletableFuture`,你可以把 `resultFuture` 看作是异步运算结果的载体。 + +```java +CompletableFuture> resultFuture = new CompletableFuture<>(); +``` + +假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 `complete()` 方法为其传入结果,这表示 `resultFuture` 已经被完成了。 + +```java +// complete() 方法只能调用一次,后续调用将被忽略。 +resultFuture.complete(rpcResponse); +``` + +你可以通过 `isDone()` 方法来检查是否已经完成。 + +```java +public boolean isDone() { + return result != null; +} +``` + +获取异步计算的结果也非常简单,直接调用 `get()` 方法即可! + +```java +rpcResponse = completableFuture.get(); +``` + +注意 : `get()` 方法并不会阻塞,因为我们已经知道异步运算的结果了。 + +如果你已经知道计算的结果的话,可以使用静态方法 `completedFuture()` 来创建 `CompletableFuture` 。 + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!"); +assertEquals("hello!", future.get()); +``` + +`completedFuture()` 方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。 + +```java +public static CompletableFuture completedFuture(U value) { + return new CompletableFuture((value == null) ? NIL : value); +} +``` + +#### 静态工厂方法 + +这两个方法可以帮助我们封装计算逻辑。 + +```java +static CompletableFuture supplyAsync(Supplier supplier); +// 使用自定义线程池(推荐) +static CompletableFuture supplyAsync(Supplier supplier, Executor executor); +static CompletableFuture runAsync(Runnable runnable); +// 使用自定义线程池(推荐) +static CompletableFuture runAsync(Runnable runnable, Executor executor); +``` + +`runAsync()` 方法接受的参数是 `Runnable` ,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 `runAsync()` 方法。 + +```java +@FunctionalInterface +public interface Runnable { + public abstract void run(); +} +``` + +`supplyAsync()` 方法接受的参数是 `Supplier` ,这也是一个函数式接口,`U` 是返回结果值的类型。 + +```java +@FunctionalInterface +public interface Supplier { + + /** + * Gets a result. + * + * @return a result + */ + T get(); +} +``` + +当你需要异步操作且关心返回结果的时候,可以使用 `supplyAsync()` 方法。 + +```java +CompletableFuture future = CompletableFuture.runAsync(() -> System.out.println("hello!")); +future.get();// 输出 "hello!" +CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "hello!"); +assertEquals("hello!", future2.get()); +``` + +### 处理异步结算的结果 + +当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个: + +- `thenApply()` +- `thenAccept()` +- `thenRun()` +- `whenComplete()` + +`thenApply()` 方法接受一个 `Function` 实例,用它来处理结果。 + +```java +// 沿用上一个任务的线程池 +public CompletableFuture thenApply( + Function fn) { + return uniApplyStage(null, fn); +} + +//使用默认的 ForkJoinPool 线程池(不推荐) +public CompletableFuture thenApplyAsync( + Function fn) { + return uniApplyStage(defaultExecutor(), fn); +} +// 使用自定义线程池(推荐) +public CompletableFuture thenApplyAsync( + Function fn, Executor executor) { + return uniApplyStage(screenExecutor(executor), fn); +} +``` + +`thenApply()` 方法使用示例如下: + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!"); +assertEquals("hello!world!", future.get()); +// 这次调用将被忽略。 +future.thenApply(s -> s + "nice!"); +assertEquals("hello!world!", future.get()); +``` + +你还可以进行 **流式调用**: + +```java +CompletableFuture future = CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!"); +assertEquals("hello!world!nice!", future.get()); +``` + +**如果你不需要从回调函数中获取返回结果,可以使用 `thenAccept()` 或者 `thenRun()`。这两个方法的区别在于 `thenRun()` 不能访问异步计算的结果。** + +`thenAccept()` 方法的参数是 `Consumer` 。 + +```java +public CompletableFuture thenAccept(Consumer action) { + return uniAcceptStage(null, action); +} + +public CompletableFuture thenAcceptAsync(Consumer action) { + return uniAcceptStage(defaultExecutor(), action); +} + +public CompletableFuture thenAcceptAsync(Consumer action, + Executor executor) { + return uniAcceptStage(screenExecutor(executor), action); +} +``` + +顾名思义,`Consumer` 属于消费型接口,它可以接收 1 个输入对象然后进行“消费”。 + +```java +@FunctionalInterface +public interface Consumer { + + void accept(T t); + + default Consumer andThen(Consumer after) { + Objects.requireNonNull(after); + return (T t) -> { accept(t); after.accept(t); }; + } +} +``` + +`thenRun()` 的方法是的参数是 `Runnable` 。 + +```java +public CompletableFuture thenRun(Runnable action) { + return uniRunStage(null, action); +} + +public CompletableFuture thenRunAsync(Runnable action) { + return uniRunStage(defaultExecutor(), action); +} + +public CompletableFuture thenRunAsync(Runnable action, + Executor executor) { + return uniRunStage(screenExecutor(executor), action); +} +``` + +`thenAccept()` 和 `thenRun()` 使用示例如下: + +```java +CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice! + +CompletableFuture.completedFuture("hello!") + .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello! +``` + +`whenComplete()` 的方法的参数是 `BiConsumer` 。 + +```java +public CompletableFuture whenComplete( + BiConsumer action) { + return uniWhenCompleteStage(null, action); +} + + +public CompletableFuture whenCompleteAsync( + BiConsumer action) { + return uniWhenCompleteStage(defaultExecutor(), action); +} +// 使用自定义线程池(推荐) +public CompletableFuture whenCompleteAsync( + BiConsumer action, Executor executor) { + return uniWhenCompleteStage(screenExecutor(executor), action); +} +``` + +相对于 `Consumer` , `BiConsumer` 可以接收 2 个输入对象然后进行“消费”。 + +```java +@FunctionalInterface +public interface BiConsumer { + void accept(T t, U u); + + default BiConsumer andThen(BiConsumer after) { + Objects.requireNonNull(after); + + return (l, r) -> { + accept(l, r); + after.accept(l, r); + }; + } +} +``` + +`whenComplete()` 使用示例如下: + +```java +CompletableFuture future = CompletableFuture.supplyAsync(() -> "hello!") + .whenComplete((res, ex) -> { + // res 代表返回的结果 + // ex 的类型为 Throwable ,代表抛出的异常 + System.out.println(res); + // 这里没有抛出异常所有为 null + assertNull(ex); + }); +assertEquals("hello!", future.get()); +``` + +### 异常处理 + +你可以通过 `handle()` 方法来处理任务执行过程中可能出现的抛出异常的情况。 + +```java +public CompletableFuture handle( + BiFunction fn) { + return uniHandleStage(null, fn); +} + +public CompletableFuture handleAsync( + BiFunction fn) { + return uniHandleStage(defaultExecutor(), fn); +} + +public CompletableFuture handleAsync( + BiFunction fn, Executor executor) { + return uniHandleStage(screenExecutor(executor), fn); +} +``` + +示例代码如下: + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> { + if (true) { + throw new RuntimeException("Computation error!"); + } + return "hello!"; +}).handle((res, ex) -> { + // res 代表返回的结果 + // ex 的类型为 Throwable ,代表抛出的异常 + return res != null ? res : "world!"; +}); +assertEquals("world!", future.get()); +``` + +你还可以通过 `exceptionally()` 方法来处理异常情况。 + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> { + if (true) { + throw new RuntimeException("Computation error!"); + } + return "hello!"; +}).exceptionally(ex -> { + System.out.println(ex.toString());// CompletionException + return "world!"; +}); +assertEquals("world!", future.get()); +``` + +如果你想让 `CompletableFuture` 的结果就是异常的话,可以使用 `completeExceptionally()` 方法为其赋值。 + +```java +CompletableFuture completableFuture = new CompletableFuture<>(); +// ... +completableFuture.completeExceptionally( + new RuntimeException("Calculation failed!")); +// ... +completableFuture.get(); // ExecutionException +``` + +### 组合 CompletableFuture + +你可以使用 `thenCompose()` 按顺序链接两个 `CompletableFuture` 对象。 + +```java +public CompletableFuture thenCompose( + Function> fn) { + return uniComposeStage(null, fn); +} + +public CompletableFuture thenComposeAsync( + Function> fn) { + return uniComposeStage(defaultExecutor(), fn); +} + +public CompletableFuture thenComposeAsync( + Function> fn, + Executor executor) { + return uniComposeStage(screenExecutor(executor), fn); +} +``` + +`thenCompose()` 方法会使用示例如下: + +```java +CompletableFuture future + = CompletableFuture.supplyAsync(() -> "hello!") + .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!")); +assertEquals("hello!world!", future.get()); +``` + +在实际开发中,这个方法还是非常有用的。比如说,我们先要获取用户信息然后再用用户信息去做其他事情。 + +和 `thenCompose()` 方法类似的还有 `thenCombine()` 方法, `thenCombine()` 同样可以组合两个 `CompletableFuture` 对象。 + +```java +CompletableFuture completableFuture + = CompletableFuture.supplyAsync(() -> "hello!") + .thenCombine(CompletableFuture.supplyAsync( + () -> "world!"), (s1, s2) -> s1 + s2) + .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "nice!")); +assertEquals("hello!world!nice!", completableFuture.get()); +``` + +**那 `thenCompose()` 和 `thenCombine()` 有什么区别呢?** + +- `thenCompose()` 可以两个 `CompletableFuture` 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。 +- `thenCombine()` 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。 + +### 并行运行多个 CompletableFuture + +你可以通过 `CompletableFuture` 的 `allOf()`这个静态方法来并行运行多个 `CompletableFuture` 。 + +实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。 + +比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 `CompletableFuture` 来处理。 + +示例代码如下: + +```java +CompletableFuture task1 = + CompletableFuture.supplyAsync(()->{ + //自定义业务操作 + }); +...... +CompletableFuture task6 = + CompletableFuture.supplyAsync(()->{ + //自定义业务操作 + }); +...... + CompletableFuture headerFuture=CompletableFuture.allOf(task1,.....,task6); + + try { + headerFuture.join(); + } catch (Exception ex) { + ...... + } +System.out.println("all done. "); +``` + +经常和 `allOf()` 方法拿来对比的是 `anyOf()` 方法。 + +**`allOf()` 方法会等到所有的 `CompletableFuture` 都运行完成之后再返回** + +```java +Random rand = new Random(); +CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000 + rand.nextInt(1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("future1 done..."); + } + return "abc"; +}); +CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(1000 + rand.nextInt(1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + System.out.println("future2 done..."); + } + return "efg"; +}); +``` + +调用 `join()` 可以让程序等`future1` 和 `future2` 都运行完了之后再继续执行。 + +```java +CompletableFuture completableFuture = CompletableFuture.allOf(future1, future2); +completableFuture.join(); +assertTrue(completableFuture.isDone()); +System.out.println("all futures done..."); +``` + +输出: + +```java +future1 done... +future2 done... +all futures done... +``` + +**`anyOf()` 方法不会等待所有的 `CompletableFuture` 都运行完成之后再返回,只要有一个执行完成即可!** + +```java +CompletableFuture f = CompletableFuture.anyOf(future1, future2); +System.out.println(f.get()); +``` + +输出结果可能是: + +```java +future2 done... +efg +``` + +也可能是: + +``` +future1 done... +abc +``` From f0683e51fc459439eee9801176cc27465dea772f Mon Sep 17 00:00:00 2001 From: guide Date: Thu, 2 Sep 2021 14:18:21 +0800 Subject: [PATCH 7/8] =?UTF-8?q?[feat]CompletableFuture=E5=85=A5=E9=97=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...pletableFuture\345\205\245\351\227\250.md" | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) rename "docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" => "docs/java/multi-thread/CompletableFuture\345\205\245\351\227\250.md" (91%) diff --git "a/docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" "b/docs/java/multi-thread/CompletableFuture\345\205\245\351\227\250.md" similarity index 91% rename from "docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" rename to "docs/java/multi-thread/CompletableFuture\345\205\245\351\227\250.md" index 485588bd20a..ad1a11a32b6 100644 --- "a/docs/java/multi-thread/CompletableFuture\344\273\216\345\205\245\351\227\250\345\210\260\345\256\236\346\210\230.md" +++ "b/docs/java/multi-thread/CompletableFuture\345\205\245\351\227\250.md" @@ -4,13 +4,31 @@ ## 简单介绍 -`CompletableFuture` 同时实现了 `Future` 和 `CompletionStage` 接口。其除了提供了更为好用和强大的 `Future` 特性之外,还提供了函数式编程的能力。 +`CompletableFuture` 同时实现了 `Future` 和 `CompletionStage` 接口。 ```java public class CompletableFuture implements Future, CompletionStage { } ``` +`CompletableFuture` 除了提供了更为好用和强大的 `Future` 特性之外,还提供了函数式编程的能力。 + +![](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/image-20210902092441434.png) + +`Future` 接口有 5 个方法: + +- `boolean cancel(boolean mayInterruptIfRunning)` :尝试取消执行任务。 +- `boolean isCancelled()` :判断任务是否被取消。 +- `boolean isDone()` : 判断任务是否已经被执行完成。 +- `get()` :等待任务执行完成并获取运算结果。 +- `get(long timeout, TimeUnit unit)` :多了一个超时时间。 + +![](https://guide-blog-images.oss-cn-shenzhen.aliyuncs.com/javaguide/image-20210902093026059.png) + +`CompletionStage` 接口中的方法比较多,`CompletableFuture` 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。 + +由于方法众多,所以这里不能一一讲解,下文中我会介绍大部分常见方法的使用。 + ## 常见操作 ### 创建 CompletableFuture @@ -490,3 +508,11 @@ efg future1 done... abc ``` + +## 后记 + +这篇文章只是简单介绍了 `CompletableFuture` 比较常用的一些 API 。 + +如果想要深入学习的话,可以多找一些书籍和博客看。 + +另外,建议G友们可以看看京东的 [asyncTool](https://gitee.com/jd-platform-opensource/asyncTool) 这个并发框架,里面大量使用到了 `CompletableFuture` 。 From 576e5df6de3877536a48485696f290f36e4b3ec0 Mon Sep 17 00:00:00 2001 From: guide Date: Thu, 2 Sep 2021 14:19:06 +0800 Subject: [PATCH 8/8] =?UTF-8?q?[feat]CompletableFuture=E5=85=A5=E9=97=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 5c5f592756d..6f23b4638fa 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ 3. [并发容器总结](docs/java/multi-thread/并发容器总结.md) 4. [JUC 中的 Atomic 原子类总结](docs/java/multi-thread/Atomic原子类总结.md) 5. [AQS 原理以及 AQS 同步组件总结](docs/java/multi-thread/AQS原理以及AQS同步组件总结.md) +6. [CompletableFuture入门](docs/java/multi-thread/CompletableFuture入门.md) ### JVM (必看 :+1:)