diff --git a/project.clj b/project.clj index 2647e3f..6fb16bf 100644 --- a/project.clj +++ b/project.clj @@ -7,7 +7,7 @@ :username :env/clojars_username :password :env/clojars_password :sign-releases false}]] - :dependencies [[primitive-math "0.1.6"] + :dependencies [[org.clj-commons/primitive-math "1.0.0"] [manifold "0.1.9"]] :profiles {:dev {:dependencies [[org.clojure/clojure "1.10.3"] [org.clojure/test.check "1.1.0"] diff --git a/src/byte_streams.clj b/src/byte_streams.clj index 1894415..cee14c4 100644 --- a/src/byte_streams.clj +++ b/src/byte_streams.clj @@ -1,4 +1,9 @@ -(ns byte-streams +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams" + :no-doc true + :superseded-by "clj-commons.byte-streams"} + byte-streams (:refer-clojure :exclude [byte-array vector-of]) (:require [manifold diff --git a/src/byte_streams/ByteBufferInputStream.java b/src/byte_streams/ByteBufferInputStream.java index 959c570..76d4f2e 100644 --- a/src/byte_streams/ByteBufferInputStream.java +++ b/src/byte_streams/ByteBufferInputStream.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +@Deprecated public class ByteBufferInputStream extends InputStream { private ByteBuffer _buf; diff --git a/src/byte_streams/InputStream.java b/src/byte_streams/InputStream.java index 121e6f9..0bd365f 100644 --- a/src/byte_streams/InputStream.java +++ b/src/byte_streams/InputStream.java @@ -2,6 +2,7 @@ import java.io.IOException; +@Deprecated public class InputStream extends java.io.InputStream { public interface Streamable { diff --git a/src/byte_streams/Utils.java b/src/byte_streams/Utils.java index 87510ab..1e3e02b 100644 --- a/src/byte_streams/Utils.java +++ b/src/byte_streams/Utils.java @@ -1,5 +1,6 @@ package byte_streams; +@Deprecated public class Utils { public static byte[] byteArray(int length) { return new byte[length]; diff --git a/src/byte_streams/char_sequence.clj b/src/byte_streams/char_sequence.clj index 50d6f56..be06633 100644 --- a/src/byte_streams/char_sequence.clj +++ b/src/byte_streams/char_sequence.clj @@ -1,4 +1,9 @@ -(ns byte-streams.char-sequence +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams.char-sequence" + :no-doc true + :superseded-by "clj-commons.byte-streams.char-sequence"} + byte-streams.char-sequence (:refer-clojure :exclude [flush]) (:import [java.util.concurrent.locks diff --git a/src/byte_streams/graph.clj b/src/byte_streams/graph.clj index f197188..b2a0160 100644 --- a/src/byte_streams/graph.clj +++ b/src/byte_streams/graph.clj @@ -1,4 +1,9 @@ -(ns byte-streams.graph +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams.graph" + :no-doc true + :superseded-by "clj-commons.byte-streams.graph"} + byte-streams.graph (:refer-clojure :exclude [type]) (:require [manifold.stream :as s] diff --git a/src/byte_streams/protocols.clj b/src/byte_streams/protocols.clj index 6f1ceb0..1d6becc 100644 --- a/src/byte_streams/protocols.clj +++ b/src/byte_streams/protocols.clj @@ -1,4 +1,9 @@ -(ns byte-streams.protocols +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams.protocols" + :no-doc true + :superseded-by "clj-commons.byte-streams.protocols"} + byte-streams.protocols (:require [byte-streams.utils :refer [defprotocol+]]) (:import diff --git a/src/byte_streams/pushback_stream.clj b/src/byte_streams/pushback_stream.clj index 989b0b6..48fbb6d 100644 --- a/src/byte_streams/pushback_stream.clj +++ b/src/byte_streams/pushback_stream.clj @@ -1,4 +1,9 @@ -(ns byte-streams.pushback-stream +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams.pushback-stream" + :no-doc true + :superseded-by "clj-commons.byte-streams.pushback-stream"} + byte-streams.pushback-stream (:refer-clojure :exclude [take]) (:require [primitive-math :as p] diff --git a/src/byte_streams/utils.clj b/src/byte_streams/utils.clj index 40645fa..e3581d8 100644 --- a/src/byte_streams/utils.clj +++ b/src/byte_streams/utils.clj @@ -1,4 +1,9 @@ -(ns byte-streams.utils) +(ns + ^{:deprecated true + :doc "DEPRECATED: moved to clj-commons.byte-streams.utils" + :no-doc true + :superseded-by "clj-commons.byte-streams.utils"} + byte-streams.utils) (defmacro defprotocol+ [name & body] (when-not (resolve name) diff --git a/src/clj_commons/byte_streams.clj b/src/clj_commons/byte_streams.clj new file mode 100644 index 0000000..e90d57b --- /dev/null +++ b/src/clj_commons/byte_streams.clj @@ -0,0 +1,1019 @@ +(ns clj-commons.byte-streams + (:refer-clojure :exclude [byte-array vector-of]) + (:require + [manifold + [stream :as s] + [deferred :as d]] + [clj-commons.byte-streams + [graph :as g] + [protocols :as proto] + [pushback-stream :as ps] + [char-sequence :as cs]] + [clojure.java.io :as io] + [clj-commons.primitive-math :as p]) + (:import + [clj_commons.byte_streams + Utils + ByteBufferInputStream] + [clj_commons.byte_streams.graph + Type] + [java.nio + ByteBuffer + DirectByteBuffer] + [java.lang.reflect + Array] + [java.util.concurrent.atomic + AtomicBoolean] + [java.io + File + FileOutputStream + FileInputStream + ByteArrayInputStream + ByteArrayOutputStream + PipedOutputStream + PipedInputStream + DataInputStream + InputStream + OutputStream + IOException + RandomAccessFile + Reader + InputStreamReader + BufferedReader] + [java.nio.channels + ReadableByteChannel + WritableByteChannel + FileChannel + FileChannel$MapMode + Channels + Pipe] + [java.nio.channels.spi + AbstractSelectableChannel])) + +;;; + +(defonce conversions (atom (g/conversion-graph))) +(defonce inverse-conversions (atom (g/conversion-graph))) +(defonce src->dst->transfer (atom nil)) + +(def ^:private ^:const byte-array-type (class (Utils/byteArray 0))) + +(defn seq-of [x] + (g/type 'seq (if (identical? bytes x) byte-array-type x))) + +(defn stream-of [x] + (g/type 'stream (if (identical? bytes x) byte-array-type x))) + +(defn vector-of [x] + (g/type 'vector (if (identical? bytes x) byte-array-type x))) + +(defn type-descriptor + "Returns a descriptor of the type of the given instance." + [x] + (cond + + (nil? x) + (g/type ::nil) + + (identical? bytes x) + (g/type byte-array-type) + + (vector? x) + (vector-of (.type ^Type (type-descriptor (first x)))) + + (sequential? x) + (seq-of nil) + + (s/source? x) + (stream-of nil) + + :else + (g/type (class x)))) + +(defn- normalize-type-descriptor [x] + (cond + (instance? Type x) + x + + (or (= 'bytes x) (= bytes x)) + (g/type byte-array-type) + + :else + (g/type (eval x)))) + +(defn- tag-metadata-for [^Type src] + (if (and (instance? Class (.type src)) + (not (.wrapper src))) + {:tag (if (= src (normalize-type-descriptor 'bytes)) + 'bytes + (.getName ^Class (.type src)))} + {})) + +(defmacro def-conversion + "Defines a conversion from one type to another." + [[src dst :as conversion] params & body] + (let [^Type src (normalize-type-descriptor src) + dst (normalize-type-descriptor dst)] + `(let [f# + (fn [~(with-meta (first params) + (tag-metadata-for src)) + ~(if-let [options (second params)] + options + `_#)] + ~@body) + + cost# + ~(get (meta conversion) :cost 1)] + (swap! conversions g/assoc-conversion ~src ~dst f# cost#) + (swap! inverse-conversions g/assoc-conversion ~dst ~src f# cost#)))) + +(defmacro def-transfer + "Defines a byte transfer from one type to another." + [[src dst] params & body] + (let [src (normalize-type-descriptor src) + dst (normalize-type-descriptor dst) + src-meta (tag-metadata-for src) + dst-meta (tag-metadata-for dst)] + `(swap! src->dst->transfer assoc-in [~src ~dst] + (fn [~(with-meta (first params) src-meta) + ~(with-meta (second params) dst-meta) + ~(if-let [options (get params 2)] options (gensym "options"))] + ~@body)))) + +;;; convert + +(def ^:private converter + (memoize + (fn [src dst] + (g/conversion-fn @conversions src dst)))) + +(declare convert) + +(def ^:private seq-converter + (memoize + (fn [dst] + (g/seq-conversion-fn @conversions convert 'seq dst)))) + +(def ^:private stream-converter + (memoize + (fn [dst] + (g/seq-conversion-fn @conversions convert 'stream dst)))) + +(defn conversion-path [src dst] + (let [path (-> @conversions + (g/conversion-path (g/type src) (g/type dst)) + :path)] + (map (partial mapv g/pprint-type) path))) + +(defn convert + "Converts `x`, if possible, into type `dst`, which can be either a class or protocol. If no such conversion + is possible, an IllegalArgumentException is thrown. If `x` is a stream, then the `src` type must be explicitly + specified. + + `options` is a map, whose available settings depend on what sort of transform is being performed: + + `chunk-size` - if a stream is being transformed into a sequence of discrete chunks, `:chunk-size` describes the + size of the chunks, which default to 4096 bytes. + + `encoding` - if a string is being encoded or decoded, `:encoding` describes the charset that is used, which + defaults to 'UTF-8' + + `direct?` - if a byte-buffer is being allocated, `:direct?` describes whether it should be a direct buffer, + defaulting to false + `source-type` - overrides input type detection, required to convert a stream + + (NB: if you need to convert a stream to a seq, or vice versa, but not the underlying byte type, you want + Manifold's `stream->seq` and `->source` instead)" + ([x dst] + (convert x dst nil)) + ([x dst options] + (let [dst (g/type dst) + source-type (get options :source-type) + ^Type + src (g/type + (or source-type + (type-descriptor x))) + wrapper (.wrapper src)] + + (cond + + (not (nil? (.type src))) + (if-let [f (or + (converter src dst) + (converter (g/type (class x)) dst))] + (f x (if source-type (dissoc options :source-type) options)) + (throw + (IllegalArgumentException. + (str "Don't know how to convert " (class x) " into " (g/pprint-type dst))))) + + (= 'seq wrapper) + (if-let [f (seq-converter dst)] + (f x (if source-type (dissoc options :source-type) options)) + x) + + (= 'stream wrapper) + (if-let [f (stream-converter dst)] + (f x (if source-type (dissoc options :source-type) options)) + x) + + :else + (throw (IllegalArgumentException. (str "invalid wrapper type: " (pr-str wrapper) " " (pr-str (.type src))))))))) + +(defn possible-conversions + "Returns a list of all possible conversion targets from value." + [src] + (let [^Type src (g/type src) + pred (cond + (.type src) + (partial converter src) + + (= 'seq (.wrapper src)) + seq-converter + + (= 'stream (.wrapper src)) + stream-converter)] + (->> @conversions + g/possible-targets + (filter pred) + (map g/pprint-type)))) + +(let [memoized-cost (memoize + (fn [src dst] + (if-let [path (g/conversion-path @conversions src dst)] + (:cost path) + 9999)))] + (defn conversion-cost + "Returns the estimated cost of converting the data `x` to the destination type `dst`." + ^long [x dst] + (memoized-cost (type-descriptor x) (normalize-type-descriptor dst)))) + +;;; transfer + +(defn- default-transfer + [source sink {:keys [chunk-size] :or {chunk-size 1024} :as options}] + (loop [] + (when-let [b (proto/take-bytes! source chunk-size options)] + (proto/send-bytes! sink b options) + (recur)))) + +(def ^:private transfer-fn + (memoize + (fn this [^Type src ^Type dst] + (let [converter-fn (cond + (nil? (.wrapper src)) + converter + + (#{'seq 'vector} (.wrapper src)) + (fn [_ d] (seq-converter d)) + + (= 'stream (.wrapper src)) + (fn [_ d] (stream-converter d)))] + + ;; TODO: do a reverse traversal, not an exhaustive forward search + (let [[src' dst'] (->> @src->dst->transfer + keys + (map (fn [src'] + (and + (converter-fn src src') + (when-let [dst' (some + #(and (converter-fn dst %) %) + (keys (@src->dst->transfer src')))] + [src' dst'])))) + (remove nil?) + first)] + (cond + + (and src' dst') + (let [f (get-in @src->dst->transfer [src' dst'])] + (fn [source sink options] + (let [source' (convert source src' options) + sink' (convert sink dst' options)] + (f source' sink' options)))) + + (and + (converter-fn src (g/type #'proto/ByteSource)) + (converter dst (g/type #'proto/ByteSink))) + (fn [source sink {:keys [close?] :or {close? true} :as options}] + (let [source' (convert source #'proto/ByteSource options) + sink' (convert sink #'proto/ByteSink options)] + (default-transfer source' sink' options) + (when close? + (doseq [x [source sink source' sink']] + (when (proto/closeable? x) + (proto/close x)))))) + + :else + nil)))))) + +;; for byte transfers +(defn transfer + "Transfers, if possible, all bytes from `source` into `sink`. If this cannot be accomplished, an IllegalArgumentException is + thrown. + + `options` is a map whose available settings depends on the source and sink types: + + `chunk-size` - if a stream is being transformed into a sequence of discrete chunks, `:chunk-size` describes the + size of the chunks, which default to 4096 bytes. + + `encoding` - if a string is being encoded or decoded, `:encoding` describes the charset that is used, which + defaults to 'UTF-8' + + `append?` - if a file is being written to, `:append?` determines whether the bytes will overwrite the existing content + or be appended to the end of the file. This defaults to true. + + `close?` - whether the sink and source should be closed once the transfer is done, defaults to true." + ([source sink] + (transfer source sink nil)) + ([source sink options] + (transfer source nil sink options)) + ([source source-type sink options] + (let [src (type-descriptor source) + dst (type-descriptor sink)] + (if-let [f (transfer-fn src dst)] + (f source sink options) + (throw (IllegalArgumentException. (str "Don't know how to transfer between " (g/pprint-type src) " to " (g/pprint-type dst)))))))) + +(def ^{:doc "Web-scale."} dev-null + (reify proto/ByteSink + (send-bytes! [_ _ _]))) + +(defn optimized-transfer? + "Returns true if an optimized transfer function exists for the given source and sink objects." + [type-descriptor sink-type] + (boolean (transfer-fn type-descriptor sink-type))) + +;;; conversion definitions + +(def-conversion ^{:cost 0} [(stream-of bytes) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^bytes msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-array ps msg 0 (alength msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + +(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^ByteBuffer msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-buffer ps (.duplicate msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + +;; byte-array => byte-buffer +(def-conversion ^{:cost 0} [bytes ByteBuffer] + [ary {:keys [direct?] :or {direct? false}}] + (if direct? + (let [len (Array/getLength ary) + ^ByteBuffer buf (ByteBuffer/allocateDirect len)] + (.put buf ary 0 len) + (.position buf 0) + buf) + (ByteBuffer/wrap ary))) + +;; byte-array => input-stream +(def-conversion ^{:cost 0} [bytes InputStream] + [ary] + (ByteArrayInputStream. ary)) + +;; byte-buffer => input-stream +(def-conversion ^{:cost 0} [ByteBuffer InputStream] + [buf] + (ByteBufferInputStream. (.duplicate buf))) + +;; byte-buffer => byte-array +(def-conversion [ByteBuffer bytes] + [buf] + (if (.hasArray buf) + (if (== (alength (.array buf)) (.remaining buf)) + (.array buf) + (let [ary (Utils/byteArray (.remaining buf))] + (doto buf + .mark + (.get ary 0 (.remaining buf)) + .reset) + ary)) + (let [^bytes ary (Utils/byteArray (.remaining buf))] + (doto buf .mark (.get ary) .reset) + ary))) + +;; sequence of byte-buffers => byte-buffer +(def-conversion [(vector-of ByteBuffer) ByteBuffer] + [bufs {:keys [direct?] :or {direct? false}}] + (cond + (empty? bufs) + (ByteBuffer/allocate 0) + + (and (empty? (rest bufs)) (not (proto/closeable? bufs))) + (first bufs) + + :else + (let [len (reduce + (map #(.remaining ^ByteBuffer %) bufs)) + buf (if direct? + (ByteBuffer/allocateDirect len) + (ByteBuffer/allocate len))] + (doseq [^ByteBuffer b bufs] + (.mark b) + (.put buf b) + (.reset b)) + (when (proto/closeable? bufs) + (proto/close bufs)) + (.flip buf)))) + +;; byte-buffer => sequence of byte-buffers +(def-conversion ^{:cost 0} [ByteBuffer (vector-of ByteBuffer)] + [buf {:keys [chunk-size]}] + (if chunk-size + (let [lim (.limit buf) + indices (range (.position buf) lim chunk-size)] + (mapv + #(-> buf + .duplicate + ^ByteBuffer (.position (int %)) + ^ByteBuffer (.limit (int (min lim (+ (int %) chunk-size)))) + .slice) + indices)) + [buf])) + +;; channel => input-stream +(def-conversion ^{:cost 0} [ReadableByteChannel InputStream] + [channel] + (Channels/newInputStream channel)) + +;; channel => lazy-seq of byte-buffers +(def-conversion [ReadableByteChannel (seq-of ByteBuffer)] + [channel {:keys [chunk-size direct?] :or {chunk-size 4096, direct? false} :as options}] + (lazy-seq + (when-let [b (proto/take-bytes! channel chunk-size options)] + (cons b (convert channel (seq-of ByteBuffer) options))))) + +;; input-stream => channel +(def-conversion ^{:cost 0} [InputStream ReadableByteChannel] + [input-stream] + (Channels/newChannel input-stream)) + +;; string => byte-array +(def-conversion ^{:cost 2} [String byte-array-type] + [s {:keys [encoding] :or {encoding "UTF-8"}}] + (.getBytes s ^String (name encoding))) + +;; byte-array => string +(def-conversion ^{:cost 2} [bytes String] + [ary {:keys [encoding] :or {encoding "UTF-8"}}] + (String. ^bytes ary (name encoding))) + +;; lazy-seq of byte-buffers => channel +(def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel] + [bufs] + (let [pipe (Pipe/open) + ^WritableByteChannel sink (.sink pipe) + source (doto ^AbstractSelectableChannel (.source pipe) + (.configureBlocking true))] + (future + (try + (loop [s bufs] + (when (and (not (empty? s)) (.isOpen sink)) + (let [buf (.duplicate ^ByteBuffer (first s))] + (.write sink buf) + (recur (rest s))))) + (finally + (.close sink)))) + source)) + +(def-conversion ^{:cost 1.5} [(seq-of #'proto/ByteSource) InputStream] + [srcs options] + (let [chunk-size (get options :chunk-size 65536) + out (PipedOutputStream.) + in (PipedInputStream. out chunk-size)] + (future + (try + (loop [s srcs] + (when-not (empty? s) + (transfer (first s) out) + (recur (rest s)))) + (finally + (.close out)))) + in)) + +(def-conversion ^{:cost 1.5} [InputStream byte-array-type] + [in options] + (let [out (ByteArrayOutputStream. (p/max 64 (.available in))) + buf (Utils/byteArray 16384)] + (loop [] + (let [len (.read in buf 0 16384)] + (when-not (neg? len) + (.write out buf 0 len) + (recur)))) + (.toByteArray out))) + +#_(let [ary (Utils/byteArray 0)] + (def-conversion ^{:cost 0} [::nil byte-array-type] + [src options] + ary)) + +(def-conversion ^{:cost 2} [#'proto/ByteSource byte-array-type] + [src options] + (let [os (ByteArrayOutputStream.)] + (transfer src os) + (.toByteArray os))) + +;; generic byte-source => lazy char-sequence +(def-conversion ^{:cost 2} [#'proto/ByteSource CharSequence] + [source options] + (cs/decode-byte-source + #(when-let [bytes (proto/take-bytes! source % options)] + (convert bytes ByteBuffer options)) + #(when (proto/closeable? source) + (proto/close source)) + options)) + +;; input-stream => reader +(def-conversion ^{:cost 1.5} [InputStream Reader] + [input-stream {:keys [encoding] :or {encoding "UTF-8"}}] + (BufferedReader. (InputStreamReader. input-stream ^String encoding))) + +;; reader => char-sequence +(def-conversion ^{:cost 1.5} [Reader CharSequence] + [reader {:keys [chunk-size] :or {chunk-size 2048}}] + (let [ary (char-array chunk-size) + sb (StringBuilder.)] + (loop [] + (let [n (.read reader ary 0 chunk-size)] + (if (pos? n) + (do + (.append sb ary 0 n) + (recur)) + (.toString sb)))))) + +;; char-sequence => string +(def-conversion [CharSequence String] + [char-sequence] + (.toString char-sequence)) + +(def-conversion [(vector-of String) String] + [strings] + (let [sb (StringBuilder.)] + (doseq [s strings] + (.append sb s)) + (.toString sb))) + +;; file => readable-channel +(def-conversion ^{:cost 0} [File ReadableByteChannel] + [file] + (.getChannel (FileInputStream. file))) + +;; file => writable-channel +(def-conversion ^{:cost 0} [File WritableByteChannel] + [file {:keys [append?] :or {append? true}}] + (.getChannel (FileOutputStream. file (boolean append?)))) + +(def-conversion ^{:cost 0} [File (seq-of ByteBuffer)] + [file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}] + (let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r")) + ^FileChannel fc (.getChannel raf) + buf-seq (fn buf-seq [offset] + (when-not (<= (.size fc) offset) + (let [remaining (- (.size fc) offset)] + (lazy-seq + (cons + (.map fc + (if writable? + FileChannel$MapMode/READ_WRITE + FileChannel$MapMode/READ_ONLY) + offset + (min remaining chunk-size)) + (buf-seq (+ offset chunk-size)))))))] + (g/closeable-seq + (buf-seq 0) + false + #(do + (.close raf) + (.close fc))))) + +;; output-stream => writable-channel +(def-conversion ^{:cost 0} [OutputStream WritableByteChannel] + [output-stream] + (Channels/newChannel output-stream)) + +;; writable-channel => output-stream +(def-conversion ^{:cost 0} [WritableByteChannel OutputStream] + [channel] + (Channels/newOutputStream channel)) + +;;; def-transfers + +(def-transfer [ReadableByteChannel File] + [channel file {:keys [chunk-size] :or {chunk-size (int 1e7)} :as options}] + (let [^FileChannel fc (convert file WritableByteChannel options)] + (try + (loop [idx 0] + (let [n (.transferFrom fc channel idx chunk-size)] + (when (pos? n) + (recur (+ idx n))))) + (finally + (.force fc true) + (.close fc))))) + +(def-transfer [File WritableByteChannel] + [file + channel + {:keys [chunk-size + close?] + :or {chunk-size (int 1e6) + close? true} + :as options}] + (let [^FileChannel fc (convert file ReadableByteChannel options)] + (try + (loop [idx 0] + (let [n (.transferTo fc idx chunk-size channel)] + (when (pos? n) + (recur (+ idx n))))) + (finally + (when close? + (.close ^WritableByteChannel channel)) + (.close fc))))) + +(def-transfer [InputStream OutputStream] + [input-stream + output-stream + {:keys [chunk-size + close?] + :or {chunk-size 4096 + close? true} + :as options}] + (let [ary (Utils/byteArray chunk-size)] + (try + (loop [] + (let [n (.read ^InputStream input-stream ary)] + (when (pos? n) + (.write ^OutputStream output-stream ary 0 n) + (recur)))) + (.flush ^OutputStream output-stream) + (finally + (.close ^InputStream input-stream) + (when close? + (.close ^OutputStream output-stream)))))) + +;;; protocol extensions + +(extend-protocol proto/ByteSink + + OutputStream + (send-bytes! [this b _] + (let [^OutputStream os this] + (.write os ^bytes (convert b byte-array-type)))) + + WritableByteChannel + (send-bytes! [this b _] + (let [^WritableByteChannel ch this] + (.write ch ^ByteBuffer (convert b ByteBuffer))))) + +(extend-protocol proto/ByteSource + + InputStream + (take-bytes! [this n _] + (let [ary (clojure.core/byte-array n) + n (long n)] + (loop [idx 0] + (if (== idx n) + ary + (let [read (.read this ary idx (long (- n idx)))] + (if (== -1 read) + (when (pos? idx) + (let [ary' (clojure.core/byte-array idx)] + (System/arraycopy ary 0 ary' 0 idx) + ary')) + (recur (long (+ idx read))))))))) + + ReadableByteChannel + (take-bytes! [this n {:keys [direct?] :or {direct? false}}] + (let [^ByteBuffer buf (if direct? + (ByteBuffer/allocateDirect n) + (ByteBuffer/allocate n))] + + (loop [] + (when (try + (pos? (.read this buf)) + (catch Throwable e + false)) + (recur))) + + (when (pos? (.position buf)) + (.flip buf)))) + + ByteBuffer + (take-bytes! [this n _] + (when (pos? (.remaining this)) + (let [n (int (min (.remaining this) n)) + buf (-> this + .duplicate + ^ByteBuffer (.limit (+ (.position this) n)) + ^ByteBuffer (.slice) + (.order (.order this)))] + (.position this (+ n (.position this))) + buf)))) + + + +;;; print-bytes + +(let [special-character? (->> "' _-+=`~{}[]()\\/#@!?.,;\"" (map int) set)] + (defn- readable-character? [x] + (or + (Character/isLetterOrDigit (int x)) + (special-character? (int x))))) + +(defn print-bytes + "Prints out the bytes in both hex and ASCII representations, 16 bytes per line." + [bytes] + (let [bufs (convert bytes (seq-of ByteBuffer) {:chunk-size 16})] + (doseq [^ByteBuffer buf bufs] + (let [s (convert (.duplicate buf) String {:encoding "ISO-8859-1"}) + bytes (repeatedly (min 16 (.remaining buf)) #(.get buf)) + padding (* 3 (- 16 (count bytes))) + hex-format #(->> "%02X" (repeat %) (interpose " ") (apply str))] + (println + (apply format + (str + (hex-format (min 8 (count bytes))) + " " + (hex-format (max 0 (- (count bytes) 8)))) + bytes) + (apply str (repeat padding " ")) + " " + (->> s + (map #(if (readable-character? %) % ".")) + (apply str))))))) + +;;; to-* helpers + +(defn ^ByteBuffer to-byte-buffer + "Converts the object to a `java.nio.ByteBuffer`." + ([x] + (to-byte-buffer x nil)) + ([x options] + (condp instance? x + ByteBuffer x + byte-array-type (ByteBuffer/wrap x) + String (ByteBuffer/wrap (.getBytes ^String x (name (get options :encoding "UTF-8")))) + (convert x ByteBuffer options)))) + +(defn to-byte-buffers + "Converts the object to a sequence of `java.nio.ByteBuffer`." + ([x] + (to-byte-buffers x nil)) + ([x options] + (convert x (seq-of ByteBuffer) options))) + +(defn ^"[B" to-byte-array + "Converts the object to a byte-array." + ([x] + (to-byte-array x nil)) + ([x options] + (condp instance? x + byte-array-type x + String (.getBytes ^String x (name (get options :encoding "UTF-8"))) + (convert x byte-array-type options)))) + +(defn to-byte-arrays + "Converts the object to a byte-array." + ([x] + (to-byte-array x nil)) + ([x options] + (convert x (seq-of byte-array-type) options))) + +(defn ^InputStream to-input-stream + "Converts the object to a `java.io.InputStream`." + ([x] + (to-input-stream x nil)) + ([x options] + (condp instance? x + byte-array-type (ByteArrayInputStream. x) + ByteBuffer (ByteBufferInputStream. x) + (convert x InputStream options)))) + +(defn ^DataInputStream to-data-input-stream + ([x] + (to-data-input-stream x nil)) + ([x options] + (if (instance? DataInputStream x) + x + (DataInputStream. (to-input-stream x))))) + +(defn ^InputStream to-output-stream + "Converts the object to a `java.io.OutputStream`." + ([x] + (to-output-stream x nil)) + ([x options] + (convert x OutputStream options))) + +(defn ^CharSequence to-char-sequence + "Converts to the object to a `java.lang.CharSequence`." + ([x] + (to-char-sequence x nil)) + ([x options] + (if (instance? CharSequence x) + x + (convert x CharSequence options)))) + +(defn ^ReadableByteChannel to-readable-channel + "Converts the object to a `java.nio.ReadableByteChannel`" + ([x] + (to-readable-channel x nil)) + ([x options] + (convert x ReadableByteChannel options))) + +(defn ^String to-string + "Converts the object to a string." + ([x] + (to-string x nil)) + ([x options] + (let [encoding (get options :encoding "UTF-8")] + (condp instance? x + String x + byte-array-type (String. ^"[B" x ^String (name encoding)) + (convert x String options))))) + +(defn to-reader + "Converts the object to a java.io.Reader." + ([x] + (to-reader x nil)) + ([x options] + (convert x Reader options))) + +(defn to-line-seq + "Converts the object to a lazy sequence of newline-delimited strings." + ([x] + (to-line-seq x nil)) + ([x options] + (let [reader (convert x Reader options) + reader (BufferedReader. ^Reader reader) + line! (fn line! [] + (lazy-seq + (when-let [l (try + (.readLine reader) + (catch IOException e + nil))] + (cons l (line!)))))] + (line!)))) + +(defn to-byte-source + "Converts the object to something that satisfies `ByteSource`." + ([x] + (to-byte-source x nil)) + ([x options] + (convert x #'proto/ByteSource options))) + +(defn to-byte-sink + "Converts the object to something that satisfies `ByteSink`." + ([x] + (to-byte-sink x nil)) + ([x options] + (convert x #'proto/ByteSink options))) + +;;; + +(defn- cmp-bufs + ^long [^ByteBuffer a' ^ByteBuffer b'] + (let [diff (p/- (.remaining a') (.remaining b')) + sign (long (if (pos? diff) -1 1)) + a (if (pos? diff) b' a') + b (if (pos? diff) a' b') + limit (p/>> (.remaining a) 2) + a-offset (.position a) + b-offset (.position b)] + (let [cmp (loop [idx 0] + (if (p/>= idx limit) + 0 + (let [cmp (p/- + (p/int->uint (.getInt a (p/+ idx a-offset))) + (p/int->uint (.getInt b (p/+ idx b-offset))))] + (if (p/== 0 cmp) + (recur (p/+ idx 4)) + ;; Use (if (pos? cmp) 1 -1) to ensure that the + ;; sign of the value x returned by cmp-bufs (and + ;; compare-bytes) is not modified when Clojure's + ;; comparator infrastructure calls (.intValue + ;; x). The intValue method truncates a Java + ;; Long's most significant 32 bits away, which + ;; in some cases changes the sign of the result, + ;; and thus the direction of the comparison + ;; result. Such code is not needed when + ;; comparing individual bytes below, because the + ;; subtraction result fits within the least + ;; significant 9 bits, and (.intValue x) never + ;; changes the sign. + (p/* sign (if (pos? cmp) 1 -1))))))] + (if (p/== 0 (long cmp)) + (let [limit' (.remaining a)] + (loop [idx limit] + (if (p/>= idx limit') + diff + (let [cmp (p/- + (p/byte->ubyte (.get a (p/+ idx a-offset))) + (p/byte->ubyte (.get b (p/+ idx b-offset))))] + (if (p/== 0 cmp) + (recur (p/inc idx)) + (p/* sign cmp)))))) + cmp)))) + +(defn compare-bytes + "Returns a comparison result for two byte streams." + ^long [a b] + (if (and + (or + (instance? byte-array-type a) + (instance? ByteBuffer a) + (instance? String a)) + (or + (instance? byte-array-type b) + (instance? ByteBuffer b) + (instance? String b))) + (cmp-bufs (to-byte-buffer a) (to-byte-buffer b)) + (loop [a (to-byte-buffers a), b (to-byte-buffers b)] + (cond + (empty? a) + (if (empty? b) 0 -1) + + (empty? b) + 1 + + :else + (let [cmp (cmp-bufs (first a) (first b))] + (if (p/== 0 cmp) + (recur (rest a) (rest b)) + cmp)))))) + +(defn bytes= + "Returns true if the two byte streams are equivalent." + [a b] + (p/== 0 (compare-bytes a b))) + + + +(comment + (require '[manifold.stream :as ms]) + + (def content + (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))) + + (conversion-path (stream-of bytes) (seq-of bytes)) ; => ([(stream-of [B) (seq-of [B)]) + + (convert content (seq-of bytes)) ; doesn't work + (= content (convert content (seq-of bytes))) + + (convert content (seq-of bytes) {:source-type (stream-of bytes)}) ; works + (= content (convert content (seq-of bytes) {:source-type (stream-of bytes)})) + + (convert content (seq-of bytes) {:source-type (stream-of nil)}) ; doesn't work + (= content (convert content (seq-of bytes) {:source-type (stream-of nil)})) + + (convert content (seq-of String)) ; also doesn't work + (= content (convert content (seq-of String))) + + (convert content (seq-of java.nio.ByteBuffer)) ; this works + (= content (convert content (seq-of java.nio.ByteBuffer))) + + + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of bytes)) ; no + #_ (convert content (seq-of bytes) {:source-type (stream-of bytes)}) ; yes + #_(convert content (seq-of bytes) {:source-type (stream-of nil)}) ; no + ) + + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of String)) ; also doesn't work + (= content (convert content (seq-of String)))) + + (let [content (doto (ms/stream) + (ms/put! (clojure.core/byte-array 5)) + (ms/close!))] + (convert content (seq-of java.nio.ByteBuffer)) ; this works + (= content (convert content (seq-of java.nio.ByteBuffer)))) + + + ) diff --git a/src/clj_commons/byte_streams/ByteBufferInputStream.java b/src/clj_commons/byte_streams/ByteBufferInputStream.java new file mode 100644 index 0000000..c64c5eb --- /dev/null +++ b/src/clj_commons/byte_streams/ByteBufferInputStream.java @@ -0,0 +1,57 @@ +package clj_commons.byte_streams; + +import java.io.InputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer _buf; + + public ByteBufferInputStream(ByteBuffer buf) { + _buf = buf; + } + + public void close() { + } + + public int available() { + return _buf.remaining(); + } + + public boolean markSupported() { + return true; + } + + public void mark(int readlimit) { + _buf.mark(); + } + + public void reset() { + _buf.reset(); + } + + public long skip(long n) { + int nP = Math.min((int)n, _buf.remaining()); + _buf.position(_buf.position() + nP); + return (long)nP; + } + + public int read() throws IOException { + if (!_buf.hasRemaining()) { + return -1; + } else { + return (int) _buf.get() & 0xFF; + } + } + + public int read(byte[] bytes, int offset, int length) throws IOException { + length = Math.min(length, _buf.remaining()); + if (length == 0) { + return -1; + } else { + _buf.get(bytes, offset, length); + return length; + } + } +} diff --git a/src/clj_commons/byte_streams/InputStream.java b/src/clj_commons/byte_streams/InputStream.java new file mode 100644 index 0000000..941f275 --- /dev/null +++ b/src/clj_commons/byte_streams/InputStream.java @@ -0,0 +1,52 @@ +package clj_commons.byte_streams; + +import java.io.IOException; + +public class InputStream extends java.io.InputStream { + + public interface Streamable { + int available(); + void close(); + long skip(long n); + int read() throws IOException; + int read(byte[] bytes, int offset, int length) throws IOException; + } + + private Streamable _s; + + public InputStream(Streamable s) { + _s = s; + } + + public void close() { + _s.close(); + } + + public int available() { + return _s.available(); + } + + public boolean markSupported() { + return false; + } + + public void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + public void reset() { + throw new UnsupportedOperationException(); + } + + public long skip(long n) { + return _s.skip(n); + } + + public int read() throws IOException { + return _s.read(); + } + + public int read(byte[] bytes, int offset, int length) throws IOException { + return _s.read(bytes, offset, length); + } +} diff --git a/src/clj_commons/byte_streams/Utils.java b/src/clj_commons/byte_streams/Utils.java new file mode 100644 index 0000000..2657d26 --- /dev/null +++ b/src/clj_commons/byte_streams/Utils.java @@ -0,0 +1,7 @@ +package clj_commons.byte_streams; + +public class Utils { + public static byte[] byteArray(int length) { + return new byte[length]; + } +} diff --git a/src/clj_commons/byte_streams/char_sequence.clj b/src/clj_commons/byte_streams/char_sequence.clj new file mode 100644 index 0000000..9a85039 --- /dev/null +++ b/src/clj_commons/byte_streams/char_sequence.clj @@ -0,0 +1,122 @@ +(ns clj-commons.byte-streams.char-sequence + (:refer-clojure :exclude [flush]) + (:import + [java.util.concurrent.locks + ReentrantLock] + [java.io + ByteArrayOutputStream] + [java.nio + ByteBuffer + CharBuffer] + [java.nio.charset + Charset + CharsetDecoder + CoderResult + CodingErrorAction])) + +(set! *unchecked-math* true) + +(defn coding-error-action [action] + (case + :report CodingErrorAction/REPORT + :ignore CodingErrorAction/IGNORE + :replace CodingErrorAction/REPLACE)) + +(defn parse-result [^CoderResult result] + (cond + (.isUnderflow result) :underflow + (.isOverflow result) :overflow + :else (throw (IllegalArgumentException. "Malformed byte-stream input to CharsetDecoder")))) + +(defn decode + [^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out] + (parse-result (.decode decoder in out false))) + +(defn flush + [^CharsetDecoder decoder ^ByteBuffer in ^CharBuffer out] + (parse-result (.decode decoder (or in (ByteBuffer/allocate 0)) out true)) + (parse-result (.flush decoder out))) + +(defn concat-bytes [^ByteBuffer a ^ByteBuffer b] + (let [buf (ByteBuffer/allocate (+ (.remaining a) (.remaining b)))] + (.put buf a) + (.put buf b) + (.flip buf))) + +(defn lazy-char-buffer-sequence + [^CharsetDecoder decoder + chunk-size + ^ByteBuffer extra-bytes + close-fn + byte-source] + (lazy-seq + (let [num-bytes (+ (long + (if extra-bytes + (.remaining extra-bytes) + 0)) + (long chunk-size)) + len (long + (Math/ceil + (/ num-bytes + (.averageCharsPerByte decoder)))) + out (CharBuffer/allocate len)] + + (if (and extra-bytes (= :overflow (decode decoder extra-bytes out))) + + ;; we didn't even exhaust the overflow bytes, try again + (cons + out + (lazy-char-buffer-sequence decoder chunk-size extra-bytes close-fn byte-source)) + + (if-let [in (byte-source chunk-size)] + (let [in (if (and extra-bytes (.hasRemaining extra-bytes)) + (concat-bytes extra-bytes in) + in) + result (decode decoder in out)] + (cons + (.flip out) + (lazy-char-buffer-sequence + decoder + chunk-size + (when (.hasRemaining ^ByteBuffer in) in) + close-fn + byte-source))) + (do + (flush decoder extra-bytes out) + (when close-fn (close-fn)) + (.flip out))))))) + +(defn decode-byte-source + [byte-source + close-fn + {:keys [chunk-size encoding on-encoding-error] + :or {chunk-size 1024 + on-encoding-error :replace + encoding "UTF-8"}}] + (let [action (coding-error-action on-encoding-error) + decoder (doto (.newDecoder (Charset/forName encoding)) + (.onMalformedInput action) + (.onUnmappableCharacter action)) + s (lazy-char-buffer-sequence decoder chunk-size nil close-fn byte-source)] + (reify + java.io.Closeable + (close [_] (when close-fn (close-fn))) + + CharSequence + (charAt [_ idx] + (loop [remaining idx, s s] + (if (empty? s) + (throw (IndexOutOfBoundsException. (str idx))) + (let [^CharBuffer buf (first s)] + (if (< (.remaining buf) remaining) + (.charAt buf remaining) + (recur (- remaining (.remaining buf)) (rest s))))))) + (length [_] + (reduce + (map #(.remaining ^CharBuffer %) s))) + #_(subSequence [_ start end] + ) + (toString [_] + (let [buf (StringBuffer.)] + (doseq [b s] + (.append buf b)) + (.toString buf)))))) diff --git a/src/clj_commons/byte_streams/graph.clj b/src/clj_commons/byte_streams/graph.clj new file mode 100644 index 0000000..1af9a72 --- /dev/null +++ b/src/clj_commons/byte_streams/graph.clj @@ -0,0 +1,312 @@ +(ns clj-commons.byte-streams.graph + (:refer-clojure :exclude [type]) + (:require + [manifold.stream :as s] + [clj-commons.byte-streams + [utils :refer [defprotocol+ defrecord+ deftype+]] + [protocols :as p]]) + (:import + [java.util + LinkedList + PriorityQueue])) + +(declare pprint-type) + +(deftype+ Conversion [f ^double cost] + Object + (equals [_ x] + (and + (instance? Conversion x) + (identical? f (.f ^Conversion x)) + (== cost (.cost ^Conversion x)))) + (hashCode [_] + (bit-xor (System/identityHashCode f) (unchecked-int cost)))) + +(deftype+ Type [wrapper type] + Object + (equals [_ x] + (and + (instance? Type x) + (= wrapper (.wrapper ^Type x)) + (= type (.type ^Type x)))) + (hashCode [_] + (bit-xor + (hash wrapper) + (hash type))) + (toString [this] + (pr-str (pprint-type this)))) + +(defn pprint-type [^Type x] + (if-let [wrapper (.wrapper x)] + (list (symbol (str wrapper "-of")) (.type x)) + (.type x))) + +(defn type + ([t] + (if (instance? Type t) + t + (type nil t))) + ([wrapper t] + (Type. wrapper + (if (var? t) + @t + t)))) + +(defn- protocol? [x] + (and (map? x) (contains? x :on-interface))) + +(defn canonicalize [x] + (if (protocol? x) + @(:var x) + x)) + +(defn- class-satisfies? [protocol ^Class c] + (boolean + (or + (.isAssignableFrom ^Class (:on-interface protocol) c) + (some + #(.isAssignableFrom ^Class % c) + (keys (:impls protocol)))))) + +(defn assignable? [^Type a ^Type b] + (and + (= (.wrapper a) (.wrapper b)) + (let [a (canonicalize (.type a)) + b (canonicalize (.type b))] + (cond + (and (class? a) (class? b)) + (.isAssignableFrom ^Class b a) + + (and (protocol? b) (class? a)) + (class-satisfies? b a) + + :else + (= a b))))) + +(defprotocol+ IConversionGraph + (assoc-conversion [_ src dst f cost]) + (equivalent-targets [_ dst]) + (possible-sources [_]) + (possible-targets [_]) + (possible-conversions [_ src]) + (conversion [_ src dst])) + +(defn implicit-conversions [^Type src] + (cond + + ;; vector -> seq + (= 'vector (.wrapper src)) + [[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (seq x)) 1)]] + + ;; seq -> stream + (= 'seq (.wrapper src)) + [[[src (Type. 'stream (.type src))] (Conversion. (fn [x _] (s/->source x)) 1)]] + + ;; stream -> seq + (= 'stream (.wrapper src)) + [[[src (Type. 'seq (.type src))] (Conversion. (fn [x _] (s/stream->seq x)) 1)]] + + :else + nil)) + +(deftype+ ConversionGraph [m] + IConversionGraph + (assoc-conversion [_ src dst f cost] + (let [m' (assoc-in m [src dst] (Conversion. f cost)) + m' (if (and + (nil? (.wrapper ^Type src)) + (nil? (.wrapper ^Type dst))) + (let [src (.type ^Type src) + dst (.type ^Type dst)] + (-> m' + (assoc-in [(Type. 'seq src) (Type. 'seq dst)] + (Conversion. (fn [x options] (map #(f % options) x)) cost)) + (assoc-in [(Type. 'stream src) (Type. 'stream dst)] + (Conversion. (fn [x options] (s/map #(f % options) x)) (+ cost 0.1))))) + m')] + (ConversionGraph. m'))) + (possible-sources [_] + (keys m)) + (possible-targets [_] + (->> m vals (mapcat keys))) + (equivalent-targets [_ dst] + (->> m + vals + (mapcat keys) + (filter #(assignable? % dst)))) + (possible-conversions [_ src] + (->> m + keys + (filter (partial assignable? src)) + (mapcat (fn [src] + (map + (fn [[k v]] + [[src k] v]) + (get m src)))) + (concat (implicit-conversions src)) + (into {})))) + +(defn conversion-graph [] + (ConversionGraph. {})) + +;;; + +(defrecord+ ConversionPath [path fns visited? cost] + Comparable + (compareTo [_ x] + (let [cmp (compare cost (.cost ^ConversionPath x))] + (if (zero? cmp) + (compare (count path) (count (.path ^ConversionPath x))) + cmp)))) + +(defn- conj-path [^ConversionPath p src dst ^Conversion c] + (ConversionPath. + (conj (.path p) [src dst]) + (conj (.fns p) (.f c)) + (conj (.visited? p) dst) + (+ (.cost p) (.cost c)))) + +(def conversion-path + (memoize + (fn [g src dst] + (let [path (ConversionPath. [] [] #{src} 0)] + (if (assignable? src dst) + path + (let [q (doto (PriorityQueue.) (.add path)) + dsts (equivalent-targets g dst)] + (loop [] + (when-let [^ConversionPath p (.poll q)] + (let [curr (or (-> p .path last second) src)] + (if (some #(assignable? curr %) dsts) + p + (do + (doseq [[[src dst] c] (->> curr + (possible-conversions g) + (remove (fn [[[src dst] c]] ((.visited? p) dst))))] + (.add q (conj-path p src dst c))) + (recur)))))))))))) + +;;; + +(defn closeable-seq [s exhaustible? close-fn] + (if (empty? s) + (when exhaustible? + (close-fn) + nil) + (reify + + clojure.lang.IPending + (isRealized [_] + (or + (not (instance? clojure.lang.IPending s)) + (realized? s))) + + Object + (finalize [_] + (close-fn)) + + java.io.Closeable + (close [_] + (close-fn)) + + clojure.lang.Sequential + clojure.lang.ISeq + clojure.lang.Seqable + (seq [this] this) + (cons [_ a] + (closeable-seq (cons a s) exhaustible? close-fn)) + (next [this] + (closeable-seq (next s) exhaustible? close-fn)) + (more [this] + (let [rst (next this)] + (if (empty? rst) + '() + rst))) + (first [_] + (first s)) + (equiv [a b] + (= s b))))) + +(defn conversion-fn [g src dst] + (when-let [path (conversion-path g src dst)] + (condp = (count (:path path)) + 0 (fn [x _] x) + + 1 (let [f (->> path :fns first)] + (if (p/closeable? src) + (fn [x options] + (let [x' (f x options)] + (when-not (p/closeable? x') + (p/close x)) + x')) + f)) + + ;; multiple stages + (let [fns (->> path :fns (apply vector))] + (fn [x options] + (let [close-fns (LinkedList.) + result (reduce + (fn [x f] + + ;; keep track of everything that needs to be closed once the bytes are exhausted + (when (p/closeable? x) + (.add close-fns #(p/close x))) + (f x options)) + x + fns)] + (if-let [close-fn (when-not (or (p/closeable? result) + (.isEmpty close-fns)) + #(loop [] + (when-let [f (.poll close-fns)] + (f) + (recur))))] + (cond + + (seq? result) + (closeable-seq result true close-fn) + + (s/source? result) + (do + (s/on-drained result close-fn) + result) + + :else + (do + ;; we assume that if the end-result is closeable, it will take care of all the intermediate + ;; objects beneath it. I think this is true as long as we're not doing multiple streaming + ;; reads, but this might need to be revisited. + (when-not (p/closeable? result) + (close-fn)) + result)) + result))))))) + +(defn seq-conversion-fn [g convert wrapper dst] + (let [path (->> g + possible-sources + (remove #(nil? (.wrapper ^Type %))) + (remove #(#{String CharSequence} (.type ^Type %))) + (map #(conversion-path g % dst)) + (remove nil?) + (sort-by :cost) + first) + ^Type src (-> path :path first first)] + + (when src + (let [wrapper' (.wrapper src) + type' (.type src)] + (fn [x options] + (->> x + + ((condp = [wrapper wrapper'] + '[seq vector] vec + '[stream vector] (comp vec s/stream->seq) + '[seq stream] s/->source + '[stream seq] s/stream->seq + identity)) + + ((condp = wrapper' + 'vector (partial mapv #(convert % type' options)) + 'seq (partial map #(convert % type' options)) + 'stream (partial s/map #(convert % type' options)))) + + (#((conversion-fn g src (-> path :path last last)) % options)))))))) diff --git a/src/clj_commons/byte_streams/protocols.clj b/src/clj_commons/byte_streams/protocols.clj new file mode 100644 index 0000000..7f9f742 --- /dev/null +++ b/src/clj_commons/byte_streams/protocols.clj @@ -0,0 +1,34 @@ +(ns clj-commons.byte-streams.protocols + (:require + [clj-commons.byte-streams.utils :refer [defprotocol+]]) + (:import + [java.util.concurrent + ConcurrentHashMap])) + +(defprotocol+ Closeable + (close [_] "A protocol that is a superset of `java.io.Closeable`.")) + +(defprotocol+ ByteSource + (take-bytes! [_ n options] "Takes `n` bytes from the byte source.")) + +(defprotocol+ ByteSink + (send-bytes! [_ bytes options] "Puts `bytes` in the byte sink.")) + +(extend-protocol Closeable + + java.io.Closeable + (close [this] (.close this)) + + ) + +(let [m (ConcurrentHashMap.)] + (defn closeable? [x] + (if (nil? x) + false + (let [c (class x) + v (.get m c)] + (if (nil? v) + (let [v (satisfies? Closeable x)] + (.put m c v) + v) + v))))) diff --git a/src/clj_commons/byte_streams/pushback_stream.clj b/src/clj_commons/byte_streams/pushback_stream.clj new file mode 100644 index 0000000..d112633 --- /dev/null +++ b/src/clj_commons/byte_streams/pushback_stream.clj @@ -0,0 +1,306 @@ +(ns clj-commons.byte-streams.pushback-stream + (:refer-clojure :exclude [take]) + (:require + [clj-commons.primitive-math :as p] + [clj-commons.byte-streams.utils :refer [doit definterface+ deftype+]] + [manifold + [utils :as u] + [stream :as s] + [deferred :as d]] + [clojure.walk :as walk]) + (:import + [java.nio + ByteBuffer] + [clj_commons.byte_streams + InputStream + InputStream$Streamable] + [java.util + LinkedList + ArrayDeque])) + +(set! *unchecked-math* true) + +(definterface+ PushbackStream + (put [^bytes x ^int offset ^int length]) + (put [^java.nio.ByteBuffer buf]) + (pushback [^bytes ary ^int offset ^int length]) + (pushback [^java.nio.ByteBuffer buf]) + (take [^bytes ary ^int offset ^int length ^boolean eager?]) + (^void close [])) + +(deftype Consumption + [^ByteBuffer buf + deferred + ^boolean eager?]) + +(defn trigger [^Consumption c] + (let [^ByteBuffer buf (.buf c)] + (d/success! (.deferred c) (.position buf)))) + +(defn put [^ByteBuffer src ^ByteBuffer dst] + (let [l (.limit src)] + (.limit src (p/+ (.position src) (p/min (.remaining src) (.remaining dst)))) + (.put dst src) + (.limit src l))) + +(defn- expand-either [first? form] + (let [form' (->> form + (map + #(if (and (seq? %) (= 'either (first %))) + (nth % (if first? 1 2)) + [%])) + (apply concat))] + (with-meta + (if (seq? form) + form' + (into (empty form) form')) + (meta form)))) + +(defn walk + [inner outer form] + (let [form' (cond + (list? form) (outer (apply list (map inner form))) + (seq? form) (outer (doall (map inner form))) + (coll? form) (outer (into (empty form) (map inner form))) + :else (outer form))] + (if (instance? clojure.lang.IMeta form') + (with-meta form' (meta form)) + form'))) + +(defn prewalk + [f form] + (walk (partial prewalk f) identity (f form))) + +(defmacro ^:private both [body] + `(do + ~(prewalk + (fn [x] + (if (sequential? x) + (expand-either true x) + x)) + body) + ~(prewalk + (fn [x] + (if (sequential? x) + (expand-either false x) + x)) + body))) + +(both + (deftype+ (either [PushbackByteStream] [SynchronizedPushbackByteStream]) + [lock + ^LinkedList consumers + ^long buffer-capacity + ^:unsynchronized-mutable ^int buffer-size + ^:unsynchronized-mutable deferred + ^:unsynchronized-mutable closed? + ^LinkedList buffer] + + InputStream$Streamable + + (available [_] + buffer-size) + + (read [this] + (let [ary (byte-array 1) + len (long @(.take this ary 0 1 true))] + (if (zero? len) + -1 + (p/bit-and 0xFF (get ary 0))))) + + (read [this ary offset length] + (let [n (long @(.take this ary offset length true))] + (if (zero? n) + -1 + n))) + + (skip [this n] + @(.take this (byte-array n) 0 n true)) + + PushbackStream + + (put [_ buf] + + (let [[consumers d] + ((either + [do] + [u/with-lock* lock]) + + (if closed? + [nil + (d/success-deferred false)] + + [(loop [acc []] + (if-let [^Consumption c (.peek consumers)] + (let [^ByteBuffer out (.buf c)] + (put buf out) + (when (or (.eager? c) (not (.hasRemaining out))) + (.remove consumers) + (recur (conj acc c)))) + acc)) + + (do + (when (.hasRemaining buf) + (.add buffer buf) + (set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf))))) + + (cond + + deferred + deferred + + (p/<= buffer-size buffer-capacity) + (d/success-deferred true) + + :else + (set! deferred (d/deferred))))]))] + + (when consumers + (doit [c consumers] + (trigger c))) + + d)) + + (put [this ary offset length] + (.put this + (-> (ByteBuffer/wrap ary) + (.position offset) + (.limit (+ offset length))))) + + (pushback [_ buf] + (let [consumers + ((either + [do] + [u/with-lock* lock]) + (let [consumers + (loop [acc []] + (if-let [^Consumption c (.peek consumers)] + (let [^ByteBuffer out (.buf c)] + (put buf out) + (when (or (.eager? c) (not (.hasRemaining out))) + (.remove consumers) + (recur (conj acc c)))) + acc))] + + (when (.hasRemaining buf) + (.addLast buffer buf) + (set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf))))) + + consumers))] + + (doit [c consumers] + (trigger c)))) + + (pushback [this ary offset length] + (.pushback this + (-> (ByteBuffer/wrap ary) + (.position offset) + (.limit (+ offset length))))) + + (take [_ ary offset length eager?] + + (let [out (-> (ByteBuffer/wrap ary) + (.position offset) + ^ByteBuffer (.limit (+ offset length)) + .slice) + + [put take] + + ((either + [do] + [u/with-lock* lock]) + + (loop [] + (when-let [^ByteBuffer in (.peek buffer)] + (put in out) + (when-not (.hasRemaining in) + (.remove buffer)) + (when (.hasRemaining out) + (recur)))) + + (set! buffer-size (unchecked-int (p/- buffer-size (.position out)))) + + [(when (and (p/<= buffer-size buffer-capacity) deferred) + (let [d deferred] + (set! deferred nil) + d)) + + (if (or closed? + (and (pos? (.position out)) + (or eager? (not (.hasRemaining out))))) + (d/success-deferred (.position out)) + (let [d (d/deferred)] + (.add consumers (Consumption. out d eager?)) + d))])] + + (when put + (d/success! put true)) + + take)) + + (close [_] + (when ((either + [do] + [u/with-lock* lock]) + (when-not closed? + (set! closed? true) + true)) + (loop [] + (when-let [^Consumption c (.poll consumers)] + (let [^ByteBuffer buf (.buf c)] + (d/success! (.deferred c) (.position buf))) + (recur)))) + + true))) + +(defn pushback-stream [capacity] + (SynchronizedPushbackByteStream. + (u/mutex) + (LinkedList.) + capacity + 0 + nil + false + (LinkedList.))) + +(defn unsafe-pushback-stream [capacity] + (PushbackByteStream. + (u/mutex) + (LinkedList.) + capacity + 0 + nil + false + (LinkedList.))) + +(def classname "clj_commons.byte_streams.pushback_stream.PushbackStream") + +(definline put-array + [p ary offset length] + `(.put ~(with-meta p {:tag classname}) ~ary ~offset ~length)) + +(definline put-buffer + [p buf] + `(.put ~(with-meta p {:tag classname}) ~buf)) + +(definline close [p] + `(.close ~(with-meta p {:tag classname}))) + +(definline eager-take + [p ary offset length] + `(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length true)) + +(definline take + [p ary offset length] + `(.take ~(with-meta p {:tag classname}) ~ary ~offset ~length false)) + +(definline pushback-array + [p ary offset length] + `(.pushback ~(with-meta p {:tag classname}) ~ary ~offset ~length)) + +(definline pushback-buffer + [p buf] + `(.pushback ~(with-meta p {:tag classname}) ~buf)) + +(defn ->input-stream [pushback-stream] + (InputStream. pushback-stream)) diff --git a/src/clj_commons/byte_streams/utils.clj b/src/clj_commons/byte_streams/utils.clj new file mode 100644 index 0000000..c942061 --- /dev/null +++ b/src/clj_commons/byte_streams/utils.clj @@ -0,0 +1,30 @@ +(ns clj-commons.byte-streams.utils) + +(defmacro defprotocol+ [name & body] + (when-not (resolve name) + `(defprotocol ~name ~@body))) + +(defmacro deftype+ [name & body] + (when-not (resolve name) + `(deftype ~name ~@body))) + +(defmacro defrecord+ [name & body] + (when-not (resolve name) + `(defrecord ~name ~@body))) + +(defmacro definterface+ [name & body] + (when-not (resolve name) + `(definterface ~name ~@body))) + +(defmacro doit + "A version of doseq that doesn't emit all that inline-destroying chunked-seq code." + [[x it] & body] + (let [it-sym (gensym "iterable")] + `(let [~it-sym ~it + it# (.iterator ~(with-meta it-sym {:tag 'java.lang.Iterable}))] + (loop [] + (when (.hasNext it#) + (let [~x (.next it#)] + ~@body) + (recur)))))) + diff --git a/test/clj_commons/byte_streams_reload_test.clj b/test/clj_commons/byte_streams_reload_test.clj new file mode 100644 index 0000000..a7441a3 --- /dev/null +++ b/test/clj_commons/byte_streams_reload_test.clj @@ -0,0 +1,7 @@ +(ns clj-commons.byte-streams-reload-test + (:require + [clojure.test :refer :all])) + +#_(deftest test-reload-all + (dotimes [_ 5] + (require 'clj-commons.byte-streams :reload-all))) diff --git a/test/clj_commons/byte_streams_simple_check.clj b/test/clj_commons/byte_streams_simple_check.clj new file mode 100644 index 0000000..b8588e3 --- /dev/null +++ b/test/clj_commons/byte_streams_simple_check.clj @@ -0,0 +1,17 @@ +(ns clj-commons.byte-streams-simple-check + (:require + [clojure.test :refer :all] + [clj-commons.byte-streams :as bs] + [clojure.test.check.generators :as gen] + [clojure.test.check.properties :as prop] + [clojure.test.check.clojure-test :as ct :refer (defspec)])) + +(defn sign [x] + (cond + (zero? x) 0 + (neg? x) -1 + :else 1)) + +(defspec equivalent-comparison 10000 + (prop/for-all [a gen/string-ascii , b gen/string-ascii] + (= (sign (compare a b)) (sign (bs/compare-bytes a b))))) diff --git a/test/clj_commons/byte_streams_test.clj b/test/clj_commons/byte_streams_test.clj new file mode 100644 index 0000000..f264a4d --- /dev/null +++ b/test/clj_commons/byte_streams_test.clj @@ -0,0 +1,145 @@ +(ns clj-commons.byte-streams-test + (:require + [clj-commons.byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]] + [clojure.test :refer :all] + [clj-commons.byte-streams.char-sequence :as cs]) + (:refer-clojure + :exclude [vector-of]) + (:import + [java.nio.charset + Charset] + [java.io + ByteArrayInputStream + File] + [java.nio + ByteBuffer] + [java.util + Arrays])) + +(def ^String text + "The suburb of Saffron Park lay on the sunset side of London, as red and ragged as a cloud of sunset. It was built of a bright brick throughout; its sky-line was fantastic, and even its ground plan was wild. It had been the outburst of a speculative builder, faintly tinged with art, who called its architecture sometimes Elizabethan and sometimes Queen Anne, apparently under the impression that the two sovereigns were identical. It was described with some justice as an artistic colony, though it never in any definable way produced any art. But although its pretensions to be an intellectual centre were a little vague, its pretensions to be a pleasant place were quite indisputable. The stranger who looked for the first time at the quaint red houses could only think how very oddly shaped the people must be who could fit in to them. Nor when he met the people was he disappointed in this respect. The place was not only pleasant, but perfect, if once he could regard it not as a deception but rather as a dream. Even if the people were not \"artists,\" the whole was nevertheless artistic. That young man with the long, auburn hair and the impudent face—that young man was not really a poet; but surely he was a poem. That old gentleman with the wild, white beard and the wild, white hat—that venerable humbug was not really a philosopher; but at least he was the cause of philosophy in others. That scientific gentleman with the bald, egg-like head and the bare, bird-like neck had no real right to the airs of science that he assumed. He had not discovered anything new in biology; but what biological creature could he have discovered more singular than himself? Thus, and thus only, the whole place had properly to be regarded; it had to be considered not so much as a workshop for artists, but as a frail but finished work of art. A man who stepped into its social atmosphere felt as if he had stepped into a written comedy.") + +(defn eval' [x] + (if (sequential? x) + (condp = (first x) + 'vector-of (vector-of (second x)) + 'stream-of (stream-of (second x)) + 'seq-of (seq-of (second x))) + x)) + +(def ary + (byte-array (map byte (range -127 127)))) + +(deftest test-roundtrips + (let [pairwise-conversions (->> String + possible-conversions + (mapcat #(map list (repeat %) (possible-conversions %))) + distinct + (map (partial map eval')))] + (doseq [[src dst] pairwise-conversions] + + (is (= text + (-> text + (convert src) + (convert dst) + (convert String)) + (-> text + (convert src) + (convert dst {:source-type src}) + (convert String {:source-type dst}))) + (str (pr-str src) " -> " (pr-str dst))))) + + ;; make sure none of our intermediate representations are strings if our target isn't a string + (let [invalid-destinations (->> #{String CharSequence java.io.Reader} + (mapcat #(vector % (list 'seq-of %) (list 'stream-of %))) + set) + pairwise-conversions (->> (class ary) + possible-conversions + (remove invalid-destinations) + (mapcat #(map list (repeat %) (remove invalid-destinations (possible-conversions %)))) + distinct + (map (partial map eval')))] + (doseq [[src dst] pairwise-conversions] + + (is (= (seq ary) + (-> ary + (convert src) + (convert dst) + (convert (class ary)) + seq) + (-> ary + (convert src) + (convert dst {:source-type src}) + (convert (class ary) {:source-type dst}) + seq)) + (str src " -> " dst ": " + (pr-str + (concat + (conversion-path (class ary) src) + (conversion-path src dst) + (conversion-path dst (class ary))))))))) + +(defn temp-file [] + (doto (File/createTempFile "byte-streams" ".tmp") + (.deleteOnExit))) + +(deftest test-transfer + (doseq [dst (->> String + possible-conversions + (map eval'))] + + (let [file (temp-file) + file' (temp-file)] + (transfer (convert text dst) dev-null) + (transfer (convert text dst) file {:chunk-size 128}) + (is (= text (to-string file))) + (transfer (convert text dst) file {:chunk-size 128, :append? false}) + (is (= text (to-string file))) + (is (= text (to-string (to-byte-buffers file {:chunk-size 128})))) + + (transfer file file') + (is (= text (to-string file'))) + (is (= text (to-string (to-byte-buffers file' {:chunk-size 128}))))))) + +;;; + +(deftest test-byte-buffer + (let [arr (.getBytes ^String text) + pos 13 + buf (doto (ByteBuffer/wrap arr) (.position pos))] + (to-byte-array buf) + (to-byte-array (repeat 2 buf)) + (is (= pos (.position buf))))) + +(deftest test-seq-of-byte-buffer + (let [buf (doto ^ByteBuffer (to-byte-buffer "quick brown fox") + (.position 3) + (.limit 6)) + arr (to-byte-array buf)] + (doseq [chunk-size (range 1 (+ 1 (.capacity buf)))] + (is (Arrays/equals + (to-byte-array (convert buf (seq-of ByteBuffer) {:chunk-size chunk-size})) + arr))))) + +(deftest ^:stress test-large-chunked-stream + (let [text-seq (repeat 1e4 text)] + (is (bytes= + (to-byte-array text-seq) + (-> text-seq + to-input-stream + (convert (seq-of ByteBuffer) {:chunk-size 1}) + to-byte-array))))) + +(deftest test-unicode-decoding + (let [three-byte-char "丁" + text (apply str (repeat 1e4 three-byte-char)) + text-bytes (to-byte-array text)] + (is (bytes= text-bytes (-> text-bytes to-string to-byte-array))) + (is (bytes= text-bytes (-> text-bytes to-input-stream to-string to-byte-array))) + (is (bytes= text-bytes (-> text-bytes (to-input-stream {:chunk-size 128}) to-string to-byte-array))))) + +(deftest compare-bytes-former-bug + (let [bx (convert (byte-array [0x00 0x00 0x00 0x01]) java.nio.ByteBuffer) + by (convert (byte-array [0x80 0x00 0x00 0x01]) java.nio.ByteBuffer)] + (is (= [bx by] (sort compare-bytes [bx by]))) + (is (= [bx by] (sort compare-bytes [by bx]))))) diff --git a/test/clj_commons/pushback_stream_test.clj b/test/clj_commons/pushback_stream_test.clj new file mode 100644 index 0000000..8a52a0a --- /dev/null +++ b/test/clj_commons/pushback_stream_test.clj @@ -0,0 +1,20 @@ +(ns clj-commons.pushback-stream-test + (:require + [clojure.test :refer :all] + [clj-commons.byte-streams.pushback-stream :as p])) + +(def in (byte-array (range 100))) + +(deftest test-pushback-stream + (let [p (p/pushback-stream 50) + x (p/put-array p in 0 100) + ary (byte-array 50)] + (is (= 50 @(p/take p ary 0 50))) + (is (= (range 50) (seq ary))) + (is (= true @x)) + (is (= 50 @(p/take p ary 0 50))) + (is (= (range 50 100) (seq ary))) + + (p/pushback-array p in 50 50) + (is (= 50 @(p/take p ary 0 50))) + (is (= (range 50 100) (seq ary)))))