Skip to content

Commit 7b7b7af

Browse files
committed
HTTP client request cancellation
This patch changes `aleph.http/request` so that setting the response deferred to an error status will terminate an in-flight request. This allows e.g. for `d/timeout!` to be used without potentially leaking connections. For convenient explicit cancellation, we provide `aleph.http/cancel-request!`. IT sets the given response deferred to error with an instance of the new `aleph.utils.RequestCancellationException`. Closes #712.
1 parent 3304d64 commit 7b7b7af

File tree

3 files changed

+154
-88
lines changed

3 files changed

+154
-88
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package aleph.utils;
2+
3+
import java.util.concurrent.CancellationException;
4+
5+
public class RequestCancellationException extends CancellationException {
6+
7+
public RequestCancellationException() { }
8+
9+
public RequestCancellationException(String message) {
10+
super(message);
11+
}
12+
13+
public RequestCancellationException(Throwable cause) {
14+
super(cause.getMessage());
15+
initCause(cause);
16+
}
17+
18+
public RequestCancellationException(String message, Throwable cause) {
19+
super(message);
20+
initCause(cause);
21+
}
22+
23+
}

src/aleph/http.clj

Lines changed: 109 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ConnectionTimeoutException
2121
PoolTimeoutException
2222
ReadTimeoutException
23+
RequestCancellationException
2324
RequestTimeoutException)
2425
(io.aleph.dirigiste Pools)
2526
(io.netty.handler.codec Headers)
@@ -336,6 +337,9 @@
336337
by [clj-http](https://github.com/dakrone/clj-http), and returns a deferred representing
337338
the HTTP response. Also allows for a custom `pool` or `middleware` to be defined.
338339
340+
Putting the returned deferred into an error state will cancel the underlying request if it is
341+
still in flight.
342+
339343
Param key | Description
340344
-------------------- | -----------------------------------------------------------------------------------------------------------------------------------------------------------------
341345
`connection-timeout` | timeout in milliseconds for the connection to become established
@@ -358,96 +362,115 @@
358362
middleware identity
359363
connection-timeout 6e4} ;; 60 seconds
360364
:as req}]
361-
362-
(executor/with-executor response-executor
363-
((middleware
364-
(fn [req]
365-
(let [k (client/req->domain req)
366-
start (System/currentTimeMillis)]
367-
368-
;; acquire a connection
369-
(-> (flow/acquire pool k)
370-
(maybe-timeout! pool-timeout)
371-
372-
;; pool timeout triggered
373-
(d/catch' TimeoutException
374-
(fn [^Throwable e]
375-
(d/error-deferred (PoolTimeoutException. e))))
376-
377-
(d/chain'
378-
(fn [conn]
379-
380-
;; get the wrapper for the connection, which may or may not be realized yet
381-
(-> (first conn)
382-
(maybe-timeout! connection-timeout)
383-
384-
;; connection timeout triggered, dispose of the connetion
385-
(d/catch' TimeoutException
386-
(fn [^Throwable e]
387-
(log/error e "Timed out waiting for connection to be established")
388-
(flow/dispose pool k conn)
389-
(d/error-deferred (ConnectionTimeoutException. e))))
390-
391-
;; connection failed, bail out
392-
(d/catch'
393-
(fn [e]
394-
(log/error e "Connection failure")
395-
(flow/dispose pool k conn)
396-
(d/error-deferred e)))
397-
398-
;; actually make the request now
399-
(d/chain'
400-
(fn [conn']
401-
(when-not (nil? conn')
402-
(let [end (System/currentTimeMillis)]
403-
(-> (conn' req)
404-
(maybe-timeout! request-timeout)
405-
406-
;; request timeout triggered, dispose of the connection
407-
(d/catch' TimeoutException
408-
(fn [^Throwable e]
409-
(flow/dispose pool k conn)
410-
(d/error-deferred (RequestTimeoutException. e))))
411-
412-
;; request failed, dispose of the connection
413-
(d/catch'
365+
(let [dispose-conn! (atom (fn []))
366+
result (d/deferred nil)
367+
response (executor/with-executor response-executor
368+
((middleware
369+
(fn [req]
370+
(let [k (client/req->domain req)
371+
start (System/currentTimeMillis)]
372+
373+
;; acquire a connection
374+
(-> (flow/acquire pool k)
375+
(maybe-timeout! pool-timeout)
376+
377+
;; pool timeout triggered
378+
(d/catch' TimeoutException
379+
(fn [^Throwable e]
380+
(d/error-deferred (PoolTimeoutException. e))))
381+
382+
(d/chain'
383+
(fn [conn]
384+
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
385+
386+
;; get the wrapper for the connection, which may or may not be realized yet
387+
(-> (first conn)
388+
(maybe-timeout! connection-timeout)
389+
390+
;; connection timeout triggered, dispose of the connetion
391+
(d/catch' TimeoutException
392+
(fn [^Throwable e]
393+
(log/error e "Timed out waiting for connection to be established")
394+
(flow/dispose pool k conn)
395+
(d/error-deferred (ConnectionTimeoutException. e))))
396+
397+
;; connection failed, bail out
398+
(d/catch'
414399
(fn [e]
415-
(log/trace "Request failed. Disposing of connection...")
400+
(log/error e "Connection failure")
416401
(flow/dispose pool k conn)
417402
(d/error-deferred e)))
418403

419-
;; clean up the connection
420-
(d/chain'
421-
(fn cleanup-conn [rsp]
422-
423-
;; either destroy/dispose of the conn, or release it back for reuse
424-
(-> (:aleph/destroy-conn? rsp)
425-
(maybe-timeout! read-timeout)
426-
427-
(d/catch' TimeoutException
428-
(fn [^Throwable e]
429-
(log/trace "Request timed out. Disposing of connection...")
430-
(flow/dispose pool k conn)
431-
(d/error-deferred (ReadTimeoutException. e))))
432-
433-
(d/chain'
434-
(fn [early?]
435-
(if (or early?
436-
(not (:aleph/keep-alive? rsp))
437-
(<= 400 (:status rsp)))
438-
(do
439-
(log/trace "Connection finished. Disposing...")
440-
(flow/dispose pool k conn))
441-
(flow/release pool k conn)))))
442-
(-> rsp
443-
(dissoc :aleph/destroy-conn?)
444-
(assoc :connection-time (- end start)))))))))
445-
446-
(fn handle-response [rsp]
447-
(->> rsp
448-
(middleware/handle-cookies req)
449-
(middleware/handle-redirects request req)))))))))))
450-
req))))
404+
;; actually make the request now
405+
(d/chain'
406+
(fn [conn']
407+
(when-not (nil? conn')
408+
(let [end (System/currentTimeMillis)]
409+
(-> (conn' req)
410+
(maybe-timeout! request-timeout)
411+
412+
;; request timeout triggered, dispose of the connection
413+
(d/catch' TimeoutException
414+
(fn [^Throwable e]
415+
(flow/dispose pool k conn)
416+
(d/error-deferred (RequestTimeoutException. e))))
417+
418+
;; request failed, dispose of the connection
419+
(d/catch'
420+
(fn [e]
421+
(log/trace "Request failed. Disposing of connection...")
422+
(flow/dispose pool k conn)
423+
(d/error-deferred e)))
424+
425+
;; clean up the connection
426+
(d/chain'
427+
(fn cleanup-conn [rsp]
428+
429+
;; either destroy/dispose of the conn, or release it back for reuse
430+
(-> (:aleph/destroy-conn? rsp)
431+
(maybe-timeout! read-timeout)
432+
433+
(d/catch' TimeoutException
434+
(fn [^Throwable e]
435+
(log/trace "Request timed out. Disposing of connection...")
436+
(flow/dispose pool k conn)
437+
(d/error-deferred (ReadTimeoutException. e))))
438+
439+
(d/chain'
440+
(fn [early?]
441+
(if (or early?
442+
(not (:aleph/keep-alive? rsp))
443+
(<= 400 (:status rsp)))
444+
(do
445+
(log/trace "Connection finished. Disposing...")
446+
(flow/dispose pool k conn))
447+
(flow/release pool k conn)))))
448+
(-> rsp
449+
(dissoc :aleph/destroy-conn?)
450+
(assoc :connection-time (- end start)))))))))
451+
452+
(fn handle-response [rsp]
453+
(->> rsp
454+
(middleware/handle-cookies req)
455+
(middleware/handle-redirects request req)))))))))))
456+
req))]
457+
(d/connect response result)
458+
(d/catch' result
459+
RequestCancellationException
460+
(fn [e]
461+
(log/trace e "Request cancelled. Disposing of connection...")
462+
(@dispose-conn!)
463+
(d/error-deferred e)))
464+
result)))
465+
466+
(defn cancel-request!
467+
"Accepts a response deferred as returned by `request` and cancels the underlying request if it is
468+
still in flight.
469+
470+
This is done by putting the deferred into error state with an
471+
`aleph.utils.RequestCancellationException` instance as its value."
472+
[r]
473+
(d/error! r (RequestCancellationException. "Request cancelled")))
451474

452475
(defn- req
453476
([method url]

test/aleph/http_test.clj

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
(:import
1818
(aleph.utils
1919
ConnectionTimeoutException
20+
RequestCancellationException
2021
RequestTimeoutException)
2122
(clojure.lang
2223
ExceptionInfo)
@@ -1073,9 +1074,13 @@
10731074
(Thread/sleep 5)
10741075
(s/put! s (encode-http-object response))))
10751076

1077+
(defmacro with-tcp-server [handler & body]
1078+
`(with-server (tcp/start-server ~handler {:port port
1079+
:shutdown-timeout 0})
1080+
~@body))
1081+
10761082
(defmacro with-tcp-response [response & body]
1077-
`(with-server (tcp/start-server (tcp-handler ~response) {:port port
1078-
:shutdown-timeout 0})
1083+
`(with-server (with-tcp-server (tcp-handler ~response))
10791084
~@body))
10801085

10811086
(defmacro with-tcp-request-handler [handler options request & body]
@@ -1438,3 +1443,18 @@
14381443
:http-versions [:http1]})]
14391444
(is (instance? IllegalArgumentException result))
14401445
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))
1446+
1447+
(deftest test-in-flight-request-cancellation
1448+
(let [conn-established (promise)
1449+
conn-closed (promise)]
1450+
(with-tcp-server (fn [s _]
1451+
(deliver conn-established true)
1452+
;; Required for the client close to be detected
1453+
(s/consume identity s)
1454+
(s/on-closed s (fn []
1455+
(deliver conn-closed true))))
1456+
(let [rsp (http-get "/")]
1457+
(is (= true (deref conn-established 1000 :timeout)))
1458+
(http/cancel-request! rsp)
1459+
(is (= true (deref conn-closed 1000 :timeout)))
1460+
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

0 commit comments

Comments
 (0)