Open
Description
Reproduction is pretty simple:
- Get yourself an InputStream (in my case, one from Minio SDK when getting an object from the storage - which originates from OkHttp)
- Pass it into response body
{:body my-is}
- Make client close connection prematurely (uh-oh, binary file being downloaded with
curl
into the terminal - by default curl does not allow that) - Aleph closes InputStream in different thread
I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):
{host 127.0.0.1:7000, user-agent curl/7.68.0, accept */*}
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
close Thread[aleph-netty-server-event-pool-23,5,main]
Feb 16, 2020 8:45:12 PM manifold.utils invoke
SEVERE: error in invoke-callbacks
java.lang.IllegalStateException: Unbalanced enter/exit
at okio.AsyncTimeout.enter(AsyncTimeout.java:73)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:235)
at okio.RealBufferedSource.read(RealBufferedSource.java:51)
at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:374)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:418)
at okhttp3.internal.Util.skipAll(Util.java:204)
at okhttp3.internal.Util.discard(Util.java:186)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:435)
at okio.RealBufferedSource.close(RealBufferedSource.java:476)
at okio.RealBufferedSource$1.close(RealBufferedSource.java:460)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:438)
at cu.resourcepack_server.core$fn__28582$fn__28595$fn__28605.invoke(form-init15695820198716781853.clj:36)
at cu.resourcepack_server.core.proxy$java.io.InputStream$ff19274a.close(Unknown Source)
at aleph.http.core$send_streaming_body$fn__15872.invoke(core.clj:324)
at manifold.utils$invoke_callbacks$fn__1143.invoke(utils.clj:69)
at manifold.utils$invoke_callbacks.invokeStatic(utils.clj:68)
at manifold.utils$invoke_callbacks.invoke(utils.clj:65)
at aleph.netty.ChannelSink.markClosed(netty.clj:344)
at aleph.netty.ChannelSink.close(netty.clj:352)
at aleph.netty$sink$fn__15376.invoke(netty.clj:407)
at manifold.deferred$eval1788$chain_SINGLEQUOTE____1809.invoke(deferred.clj:749)
at manifold.deferred$eval1788$subscribe__1789$fn__1794.invoke(deferred.clj:715)
at manifold.deferred.Listener.onSuccess(deferred.clj:219)
at manifold.deferred.Deferred$fn__1634.invoke(deferred.clj:398)
at manifold.deferred.Deferred.success(deferred.clj:398)
at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
at aleph.netty$wrap_future$reify__15320.operationComplete(netty.clj:218)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at manifold.executor$thread_factory$reify__1009$f__1010.invoke(executor.clj:47)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:834)
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
close Thread[manifold-wait-6,5,main]
close Thread[manifold-wait-6,5,main]
Body + proxy class which delegates to the original stream (for debugging):
{:status 200
:body (proxy [java.io.InputStream] []
(read
([]
(a/put! log-ch (str "read0\t\t" (Thread/currentThread)))
(.read stream))
([^bytes b]
(a/put! log-ch (str "read1\t\t" (Thread/currentThread)))
(.read stream b))
([^bytes b off len]
(a/put! log-ch (str "read3\t\t" (Thread/currentThread)))
(.read stream b off len)))
(available []
(a/put! log-ch (str "available\t" (Thread/currentThread)))
(.available stream))
(skip [^long l]
(a/put! log-ch (str "skip\t\t" (Thread/currentThread)))
(.skip stream l))
(close []
(a/put! log-ch (str "close\t\t" (Thread/currentThread)))
(.close stream)))
:headers {"Content-Type" "application/zip"}}
Also simple logging snippet which works ok with the test scenario:
(def log-ch
(a/chan))
(a/go-loop [m (a/<! log-ch)]
(when m
(println m)
(recur (a/<! log-ch))))
I think that this is not right
Metadata
Metadata
Assignees
Labels
No labels