Skip to content

InputStream as passed a body gets closed by different thread when client closes connection prematurely #535

Open
@mikroskeem

Description

@mikroskeem

Reproduction is pretty simple:

  1. Get yourself an InputStream (in my case, one from Minio SDK when getting an object from the storage - which originates from OkHttp)
  2. Pass it into response body {:body my-is}
  3. Make client close connection prematurely (uh-oh, binary file being downloaded with curl into the terminal - by default curl does not allow that)
  4. Aleph closes InputStream in different thread

I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):

{host 127.0.0.1:7000, user-agent curl/7.68.0, accept */*}
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
close		Thread[aleph-netty-server-event-pool-23,5,main]
Feb 16, 2020 8:45:12 PM manifold.utils invoke
SEVERE: error in invoke-callbacks
java.lang.IllegalStateException: Unbalanced enter/exit
	at okio.AsyncTimeout.enter(AsyncTimeout.java:73)
	at okio.AsyncTimeout$2.read(AsyncTimeout.java:235)
	at okio.RealBufferedSource.read(RealBufferedSource.java:51)
	at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:374)
	at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:418)
	at okhttp3.internal.Util.skipAll(Util.java:204)
	at okhttp3.internal.Util.discard(Util.java:186)
	at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:435)
	at okio.RealBufferedSource.close(RealBufferedSource.java:476)
	at okio.RealBufferedSource$1.close(RealBufferedSource.java:460)
	at java.base/jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
	at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:438)
	at cu.resourcepack_server.core$fn__28582$fn__28595$fn__28605.invoke(form-init15695820198716781853.clj:36)
	at cu.resourcepack_server.core.proxy$java.io.InputStream$ff19274a.close(Unknown Source)
	at aleph.http.core$send_streaming_body$fn__15872.invoke(core.clj:324)
	at manifold.utils$invoke_callbacks$fn__1143.invoke(utils.clj:69)
	at manifold.utils$invoke_callbacks.invokeStatic(utils.clj:68)
	at manifold.utils$invoke_callbacks.invoke(utils.clj:65)
	at aleph.netty.ChannelSink.markClosed(netty.clj:344)
	at aleph.netty.ChannelSink.close(netty.clj:352)
	at aleph.netty$sink$fn__15376.invoke(netty.clj:407)
	at manifold.deferred$eval1788$chain_SINGLEQUOTE____1809.invoke(deferred.clj:749)
	at manifold.deferred$eval1788$subscribe__1789$fn__1794.invoke(deferred.clj:715)
	at manifold.deferred.Listener.onSuccess(deferred.clj:219)
	at manifold.deferred.Deferred$fn__1634.invoke(deferred.clj:398)
	at manifold.deferred.Deferred.success(deferred.clj:398)
	at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
	at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
	at aleph.netty$wrap_future$reify__15320.operationComplete(netty.clj:218)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at manifold.executor$thread_factory$reify__1009$f__1010.invoke(executor.clj:47)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.base/java.lang.Thread.run(Thread.java:834)

available	Thread[manifold-wait-6,5,main]
read3		Thread[manifold-wait-6,5,main]
close		Thread[manifold-wait-6,5,main]
close		Thread[manifold-wait-6,5,main]

Body + proxy class which delegates to the original stream (for debugging):

           {:status 200
            :body (proxy [java.io.InputStream] []
                    (read
                      ([]
                       (a/put! log-ch (str "read0\t\t" (Thread/currentThread)))
                       (.read stream))
                      ([^bytes b]
                       (a/put! log-ch (str "read1\t\t" (Thread/currentThread)))
                       (.read stream b))
                      ([^bytes b off len]
                       (a/put! log-ch (str "read3\t\t" (Thread/currentThread)))
                       (.read stream b off len)))
                    (available []
                      (a/put! log-ch (str "available\t" (Thread/currentThread)))
                      (.available stream))                    
                    (skip [^long l]
                      (a/put! log-ch (str "skip\t\t" (Thread/currentThread)))
                      (.skip stream l))
                    (close []
                      (a/put! log-ch (str "close\t\t" (Thread/currentThread)))
                      (.close stream)))
            :headers {"Content-Type" "application/zip"}}

Also simple logging snippet which works ok with the test scenario:

(def log-ch
  (a/chan))

(a/go-loop [m (a/<! log-ch)]
  (when m
    (println m)
    (recur (a/<! log-ch))))

I think that this is not right

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions