Skip to content

Commit 14d86ac

Browse files
committed
HTTP client request cancellation
This patch changes `aleph.http/request` so that setting the response deferred to an error status will terminate an in-flight request. Closes #712.
1 parent 3304d64 commit 14d86ac

File tree

3 files changed

+145
-88
lines changed

3 files changed

+145
-88
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package aleph.utils;
2+
3+
import java.util.concurrent.CancellationException;
4+
5+
public class RequestCancellationException extends CancellationException {
6+
7+
public RequestCancellationException() { }
8+
9+
public RequestCancellationException(String message) {
10+
super(message);
11+
}
12+
13+
public RequestCancellationException(Throwable cause) {
14+
super(cause.getMessage());
15+
initCause(cause);
16+
}
17+
18+
public RequestCancellationException(String message, Throwable cause) {
19+
super(message);
20+
initCause(cause);
21+
}
22+
23+
}

src/aleph/http.clj

Lines changed: 100 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ConnectionTimeoutException
2121
PoolTimeoutException
2222
ReadTimeoutException
23+
RequestCancellationException
2324
RequestTimeoutException)
2425
(io.aleph.dirigiste Pools)
2526
(io.netty.handler.codec Headers)
@@ -358,96 +359,109 @@
358359
middleware identity
359360
connection-timeout 6e4} ;; 60 seconds
360361
:as req}]
361-
362-
(executor/with-executor response-executor
363-
((middleware
364-
(fn [req]
365-
(let [k (client/req->domain req)
366-
start (System/currentTimeMillis)]
367-
368-
;; acquire a connection
369-
(-> (flow/acquire pool k)
370-
(maybe-timeout! pool-timeout)
371-
372-
;; pool timeout triggered
373-
(d/catch' TimeoutException
374-
(fn [^Throwable e]
375-
(d/error-deferred (PoolTimeoutException. e))))
376-
377-
(d/chain'
378-
(fn [conn]
379-
380-
;; get the wrapper for the connection, which may or may not be realized yet
381-
(-> (first conn)
382-
(maybe-timeout! connection-timeout)
383-
384-
;; connection timeout triggered, dispose of the connetion
385-
(d/catch' TimeoutException
386-
(fn [^Throwable e]
387-
(log/error e "Timed out waiting for connection to be established")
388-
(flow/dispose pool k conn)
389-
(d/error-deferred (ConnectionTimeoutException. e))))
390-
391-
;; connection failed, bail out
392-
(d/catch'
393-
(fn [e]
394-
(log/error e "Connection failure")
395-
(flow/dispose pool k conn)
396-
(d/error-deferred e)))
397-
398-
;; actually make the request now
399-
(d/chain'
400-
(fn [conn']
401-
(when-not (nil? conn')
402-
(let [end (System/currentTimeMillis)]
403-
(-> (conn' req)
404-
(maybe-timeout! request-timeout)
405-
406-
;; request timeout triggered, dispose of the connection
407-
(d/catch' TimeoutException
408-
(fn [^Throwable e]
409-
(flow/dispose pool k conn)
410-
(d/error-deferred (RequestTimeoutException. e))))
411-
412-
;; request failed, dispose of the connection
413-
(d/catch'
362+
(let [dispose-conn! (atom (fn []))
363+
result (d/deferred nil)
364+
response (executor/with-executor response-executor
365+
((middleware
366+
(fn [req]
367+
(let [k (client/req->domain req)
368+
start (System/currentTimeMillis)]
369+
370+
;; acquire a connection
371+
(-> (flow/acquire pool k)
372+
(maybe-timeout! pool-timeout)
373+
374+
;; pool timeout triggered
375+
(d/catch' TimeoutException
376+
(fn [^Throwable e]
377+
(d/error-deferred (PoolTimeoutException. e))))
378+
379+
(d/chain'
380+
(fn [conn]
381+
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
382+
383+
;; get the wrapper for the connection, which may or may not be realized yet
384+
(-> (first conn)
385+
(maybe-timeout! connection-timeout)
386+
387+
;; connection timeout triggered, dispose of the connetion
388+
(d/catch' TimeoutException
389+
(fn [^Throwable e]
390+
(log/error e "Timed out waiting for connection to be established")
391+
(flow/dispose pool k conn)
392+
(d/error-deferred (ConnectionTimeoutException. e))))
393+
394+
;; connection failed, bail out
395+
(d/catch'
414396
(fn [e]
415-
(log/trace "Request failed. Disposing of connection...")
397+
(log/error e "Connection failure")
416398
(flow/dispose pool k conn)
417399
(d/error-deferred e)))
418400

419-
;; clean up the connection
420-
(d/chain'
421-
(fn cleanup-conn [rsp]
422-
423-
;; either destroy/dispose of the conn, or release it back for reuse
424-
(-> (:aleph/destroy-conn? rsp)
425-
(maybe-timeout! read-timeout)
426-
427-
(d/catch' TimeoutException
428-
(fn [^Throwable e]
429-
(log/trace "Request timed out. Disposing of connection...")
430-
(flow/dispose pool k conn)
431-
(d/error-deferred (ReadTimeoutException. e))))
432-
433-
(d/chain'
434-
(fn [early?]
435-
(if (or early?
436-
(not (:aleph/keep-alive? rsp))
437-
(<= 400 (:status rsp)))
438-
(do
439-
(log/trace "Connection finished. Disposing...")
440-
(flow/dispose pool k conn))
441-
(flow/release pool k conn)))))
442-
(-> rsp
443-
(dissoc :aleph/destroy-conn?)
444-
(assoc :connection-time (- end start)))))))))
445-
446-
(fn handle-response [rsp]
447-
(->> rsp
448-
(middleware/handle-cookies req)
449-
(middleware/handle-redirects request req)))))))))))
450-
req))))
401+
;; actually make the request now
402+
(d/chain'
403+
(fn [conn']
404+
(when-not (nil? conn')
405+
(let [end (System/currentTimeMillis)]
406+
(-> (conn' req)
407+
(maybe-timeout! request-timeout)
408+
409+
;; request timeout triggered, dispose of the connection
410+
(d/catch' TimeoutException
411+
(fn [^Throwable e]
412+
(flow/dispose pool k conn)
413+
(d/error-deferred (RequestTimeoutException. e))))
414+
415+
;; request failed, dispose of the connection
416+
(d/catch'
417+
(fn [e]
418+
(log/trace "Request failed. Disposing of connection...")
419+
(flow/dispose pool k conn)
420+
(d/error-deferred e)))
421+
422+
;; clean up the connection
423+
(d/chain'
424+
(fn cleanup-conn [rsp]
425+
426+
;; either destroy/dispose of the conn, or release it back for reuse
427+
(-> (:aleph/destroy-conn? rsp)
428+
(maybe-timeout! read-timeout)
429+
430+
(d/catch' TimeoutException
431+
(fn [^Throwable e]
432+
(log/trace "Request timed out. Disposing of connection...")
433+
(flow/dispose pool k conn)
434+
(d/error-deferred (ReadTimeoutException. e))))
435+
436+
(d/chain'
437+
(fn [early?]
438+
(if (or early?
439+
(not (:aleph/keep-alive? rsp))
440+
(<= 400 (:status rsp)))
441+
(do
442+
(log/trace "Connection finished. Disposing...")
443+
(flow/dispose pool k conn))
444+
(flow/release pool k conn)))))
445+
(-> rsp
446+
(dissoc :aleph/destroy-conn?)
447+
(assoc :connection-time (- end start)))))))))
448+
449+
(fn handle-response [rsp]
450+
(->> rsp
451+
(middleware/handle-cookies req)
452+
(middleware/handle-redirects request req)))))))))))
453+
req))]
454+
(d/connect response result)
455+
(d/catch' result
456+
RequestCancellationException
457+
(fn [e]
458+
(log/trace e "Request cancelled. Disposing of connection...")
459+
(@dispose-conn!)
460+
(d/error-deferred e)))
461+
result)))
462+
463+
(defn cancel-request! [r]
464+
(d/error! r (RequestCancellationException. "Request cancelled")))
451465

452466
(defn- req
453467
([method url]

test/aleph/http_test.clj

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
(:import
1818
(aleph.utils
1919
ConnectionTimeoutException
20+
RequestCancellationException
2021
RequestTimeoutException)
2122
(clojure.lang
2223
ExceptionInfo)
@@ -1073,9 +1074,13 @@
10731074
(Thread/sleep 5)
10741075
(s/put! s (encode-http-object response))))
10751076

1077+
(defmacro with-tcp-server [handler & body]
1078+
`(with-server (tcp/start-server ~handler {:port port
1079+
:shutdown-timeout 0})
1080+
~@body))
1081+
10761082
(defmacro with-tcp-response [response & body]
1077-
`(with-server (tcp/start-server (tcp-handler ~response) {:port port
1078-
:shutdown-timeout 0})
1083+
`(with-server (with-tcp-server (tcp-handler ~response))
10791084
~@body))
10801085

10811086
(defmacro with-tcp-request-handler [handler options request & body]
@@ -1438,3 +1443,18 @@
14381443
:http-versions [:http1]})]
14391444
(is (instance? IllegalArgumentException result))
14401445
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))
1446+
1447+
(deftest test-in-flight-request-cancellation
1448+
(let [conn-established (promise)
1449+
conn-closed (promise)]
1450+
(with-tcp-server (fn [s _]
1451+
(deliver conn-established true)
1452+
;; Required for the client close to be detected
1453+
(s/consume identity s)
1454+
(s/on-closed s (fn []
1455+
(deliver conn-closed true))))
1456+
(let [rsp (http-get "/")]
1457+
(is (= true (deref conn-established 1000 :timeout)))
1458+
(http/cancel-request! rsp)
1459+
(is (= true (deref conn-closed 1000 :timeout)))
1460+
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

0 commit comments

Comments
 (0)