diff --git a/src/byte_streams.clj b/src/byte_streams.clj index f771178..e95fc3e 100644 --- a/src/byte_streams.clj +++ b/src/byte_streams.clj @@ -333,277 +333,279 @@ ;;; conversion definitions -(def-conversion ^{:cost 0} [(stream-of bytes) InputStream] - [s options] - (let [ps (ps/pushback-stream (get options :buffer-size 1024))] - (d/loop [] - (d/chain (s/take! s ::none) - (fn [^bytes msg] - (if (identical? ::none msg) - (do - (ps/close ps) - false) - (ps/put-array ps msg 0 (alength msg)))) - (fn [result] - (when result - (d/recur))))) - (ps/->input-stream ps))) - -(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream] - [s options] - (let [ps (ps/pushback-stream (get options :buffer-size 1024))] - (d/loop [] - (d/chain (s/take! s ::none) - (fn [^ByteBuffer msg] - (if (identical? ::none msg) - (do - (ps/close ps) - false) - (ps/put-buffer ps (.duplicate msg)))) - (fn [result] - (when result - (d/recur))))) - (ps/->input-stream ps))) - -;; byte-array => byte-buffer -(def-conversion ^{:cost 0} [bytes ByteBuffer] - [ary {:keys [direct?] :or {direct? false}}] - (if direct? - (let [len (Array/getLength ary) - ^ByteBuffer buf (ByteBuffer/allocateDirect len)] - (.put buf ary 0 len) - (.position buf 0) - buf) - (ByteBuffer/wrap ary))) - -;; byte-array => input-stream -(def-conversion ^{:cost 0} [bytes InputStream] - [ary] - (ByteArrayInputStream. ary)) - -;; byte-buffer => input-stream -(def-conversion ^{:cost 0} [ByteBuffer InputStream] - [buf] - (ByteBufferInputStream. (.duplicate buf))) - -;; byte-buffer => byte-array -(def-conversion [ByteBuffer bytes] - [buf] - (if (.hasArray buf) - (if (== (alength (.array buf)) (.remaining buf)) - (.array buf) - (let [ary (Utils/byteArray (.remaining buf))] - (doto buf - .mark - (.get ary 0 (.remaining buf)) - .reset) - ary)) - (let [^bytes ary (Utils/byteArray (.remaining buf))] - (doto buf .mark (.get ary) .reset) - ary))) - -;; sequence of byte-buffers => byte-buffer -(def-conversion [(vector-of ByteBuffer) ByteBuffer] - [bufs {:keys [direct?] :or {direct? false}}] - (cond - (empty? bufs) - (ByteBuffer/allocate 0) +(defonce _conversions_ + (do + (def-conversion ^{:cost 0} [(stream-of bytes) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^bytes msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-array ps msg 0 (alength msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + + (def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream] + [s options] + (let [ps (ps/pushback-stream (get options :buffer-size 1024))] + (d/loop [] + (d/chain (s/take! s ::none) + (fn [^ByteBuffer msg] + (if (identical? ::none msg) + (do + (ps/close ps) + false) + (ps/put-buffer ps (.duplicate msg)))) + (fn [result] + (when result + (d/recur))))) + (ps/->input-stream ps))) + + ;; byte-array => byte-buffer + (def-conversion ^{:cost 0} [bytes ByteBuffer] + [ary {:keys [direct?] :or {direct? false}}] + (if direct? + (let [len (Array/getLength ary) + ^ByteBuffer buf (ByteBuffer/allocateDirect len)] + (.put buf ary 0 len) + (.position buf 0) + buf) + (ByteBuffer/wrap ary))) + + ;; byte-array => input-stream + (def-conversion ^{:cost 0} [bytes InputStream] + [ary] + (ByteArrayInputStream. ary)) + + ;; byte-buffer => input-stream + (def-conversion ^{:cost 0} [ByteBuffer InputStream] + [buf] + (ByteBufferInputStream. (.duplicate buf))) + + ;; byte-buffer => byte-array + (def-conversion [ByteBuffer bytes] + [buf] + (if (.hasArray buf) + (if (== (alength (.array buf)) (.remaining buf)) + (.array buf) + (let [ary (Utils/byteArray (.remaining buf))] + (doto buf + .mark + (.get ary 0 (.remaining buf)) + .reset) + ary)) + (let [^bytes ary (Utils/byteArray (.remaining buf))] + (doto buf .mark (.get ary) .reset) + ary))) + + ;; sequence of byte-buffers => byte-buffer + (def-conversion [(vector-of ByteBuffer) ByteBuffer] + [bufs {:keys [direct?] :or {direct? false}}] + (cond + (empty? bufs) + (ByteBuffer/allocate 0) - (and (empty? (rest bufs)) (not (proto/closeable? bufs))) - (first bufs) + (and (empty? (rest bufs)) (not (proto/closeable? bufs))) + (first bufs) - :else - (let [len (reduce + (map #(.remaining ^ByteBuffer %) bufs)) - buf (if direct? - (ByteBuffer/allocateDirect len) - (ByteBuffer/allocate len))] - (doseq [^ByteBuffer b bufs] - (.mark b) - (.put buf b) - (.reset b)) - (when (proto/closeable? bufs) - (proto/close bufs)) - (.flip buf)))) - -;; byte-buffer => sequence of byte-buffers -(def-conversion ^{:cost 0} [ByteBuffer (vector-of ByteBuffer)] - [buf {:keys [chunk-size]}] - (if chunk-size - (let [lim (.limit buf) - indices (range (.position buf) lim chunk-size)] - (mapv - #(-> buf - .duplicate - (.position %) - ^ByteBuffer (.limit (min lim (+ % chunk-size))) - .slice) - indices)) - [buf])) - -;; channel => input-stream -(def-conversion ^{:cost 0} [ReadableByteChannel InputStream] - [channel] - (Channels/newInputStream channel)) - -;; channel => lazy-seq of byte-buffers -(def-conversion [ReadableByteChannel (seq-of ByteBuffer)] - [channel {:keys [chunk-size direct?] :or {chunk-size 4096, direct? false} :as options}] - (lazy-seq - (when-let [b (proto/take-bytes! channel chunk-size options)] - (cons b (convert channel (seq-of ByteBuffer) options))))) - -;; input-stream => channel -(def-conversion ^{:cost 0} [InputStream ReadableByteChannel] - [input-stream] - (Channels/newChannel input-stream)) - -;; string => byte-array -(def-conversion ^{:cost 2} [String byte-array] - [s {:keys [encoding] :or {encoding "UTF-8"}}] - (.getBytes s ^String (name encoding))) - -;; byte-array => string -(def-conversion ^{:cost 2} [bytes String] - [ary {:keys [encoding] :or {encoding "UTF-8"}}] - (String. ^bytes ary (name encoding))) - -;; lazy-seq of byte-buffers => channel -(def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel] - [bufs] - (let [pipe (Pipe/open) - ^WritableByteChannel sink (.sink pipe) - source (doto ^AbstractSelectableChannel (.source pipe) - (.configureBlocking true))] - (future - (try - (loop [s bufs] - (when (and (not (empty? s)) (.isOpen sink)) - (let [buf (.duplicate ^ByteBuffer (first s))] - (.write sink buf) - (recur (rest s))))) - (finally - (.close sink)))) - source)) - -(def-conversion ^{:cost 1.5} [(seq-of #'proto/ByteSource) InputStream] - [srcs options] - (let [chunk-size (get options :chunk-size 65536) - out (PipedOutputStream.) - in (PipedInputStream. out chunk-size)] - (future - (try - (loop [s srcs] - (when-not (empty? s) - (transfer (first s) out) - (recur (rest s)))) - (finally - (.close out)))) - in)) - -(def-conversion ^{:cost 1.5} [InputStream byte-array] - [in options] - (let [out (ByteArrayOutputStream. (p/max 64 (.available in))) - buf (Utils/byteArray 16384)] - (loop [] - (let [len (.read in buf 0 16384)] - (when-not (neg? len) - (.write out buf 0 len) - (recur)))) - (.toByteArray out))) - -#_(let [ary (Utils/byteArray 0)] - (def-conversion ^{:cost 0} [::nil byte-array] - [src options] - ary)) - -(def-conversion ^{:cost 2} [#'proto/ByteSource byte-array] - [src options] - (let [os (ByteArrayOutputStream.)] - (transfer src os) - (.toByteArray os))) - -;; generic byte-source => lazy char-sequence -(def-conversion ^{:cost 2} [#'proto/ByteSource CharSequence] - [source options] - (cs/decode-byte-source - #(when-let [bytes (proto/take-bytes! source % options)] - (convert bytes ByteBuffer options)) - #(when (proto/closeable? source) - (proto/close source)) - options)) - -;; input-stream => reader -(def-conversion ^{:cost 1.5} [InputStream Reader] - [input-stream {:keys [encoding] :or {encoding "UTF-8"}}] - (BufferedReader. (InputStreamReader. input-stream ^String encoding))) - -;; reader => char-sequence -(def-conversion ^{:cost 1.5} [Reader CharSequence] - [reader {:keys [chunk-size] :or {chunk-size 2048}}] - (let [ary (char-array chunk-size) - sb (StringBuilder.)] - (loop [] - (let [n (.read reader ary 0 chunk-size)] - (if (pos? n) - (do - (.append sb ary 0 n) - (recur)) - (.toString sb)))))) - -;; char-sequence => string -(def-conversion [CharSequence String] - [char-sequence] - (.toString char-sequence)) - -(def-conversion [(vector-of String) String] - [strings] - (let [sb (StringBuilder.)] - (doseq [s strings] - (.append sb s)) - (.toString sb))) - -;; file => readable-channel -(def-conversion ^{:cost 0} [File ReadableByteChannel] - [file] - (.getChannel (FileInputStream. file))) - -;; file => writable-channel -(def-conversion ^{:cost 0} [File WritableByteChannel] - [file {:keys [append?] :or {append? true}}] - (.getChannel (FileOutputStream. file (boolean append?)))) - -(def-conversion ^{:cost 0} [File (seq-of ByteBuffer)] - [file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}] - (let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r")) - ^FileChannel fc (.getChannel raf) - buf-seq (fn buf-seq [offset] - (when-not (<= (.size fc) offset) - (let [remaining (- (.size fc) offset)] - (lazy-seq - (cons - (.map fc - (if writable? - FileChannel$MapMode/READ_WRITE - FileChannel$MapMode/READ_ONLY) - offset - (min remaining chunk-size)) - (buf-seq (+ offset chunk-size)))))))] - (g/closeable-seq - (buf-seq 0) - false - #(do - (.close raf) - (.close fc))))) - -;; output-stream => writable-channel -(def-conversion ^{:cost 0} [OutputStream WritableByteChannel] - [output-stream] - (Channels/newChannel output-stream)) - -;; writable-channel => output-stream -(def-conversion ^{:cost 0} [WritableByteChannel OutputStream] - [channel] - (Channels/newOutputStream channel)) + :else + (let [len (reduce + (map #(.remaining ^ByteBuffer %) bufs)) + buf (if direct? + (ByteBuffer/allocateDirect len) + (ByteBuffer/allocate len))] + (doseq [^ByteBuffer b bufs] + (.mark b) + (.put buf b) + (.reset b)) + (when (proto/closeable? bufs) + (proto/close bufs)) + (.flip buf)))) + + ;; byte-buffer => sequence of byte-buffers + (def-conversion ^{:cost 0} [ByteBuffer (vector-of ByteBuffer)] + [buf {:keys [chunk-size]}] + (if chunk-size + (let [lim (.limit buf) + indices (range (.position buf) lim chunk-size)] + (mapv + #(-> buf + .duplicate + (.position %) + ^ByteBuffer (.limit (min lim (+ % chunk-size))) + .slice) + indices)) + [buf])) + + ;; channel => input-stream + (def-conversion ^{:cost 0} [ReadableByteChannel InputStream] + [channel] + (Channels/newInputStream channel)) + + ;; channel => lazy-seq of byte-buffers + (def-conversion [ReadableByteChannel (seq-of ByteBuffer)] + [channel {:keys [chunk-size direct?] :or {chunk-size 4096, direct? false} :as options}] + (lazy-seq + (when-let [b (proto/take-bytes! channel chunk-size options)] + (cons b (convert channel (seq-of ByteBuffer) options))))) + + ;; input-stream => channel + (def-conversion ^{:cost 0} [InputStream ReadableByteChannel] + [input-stream] + (Channels/newChannel input-stream)) + + ;; string => byte-array + (def-conversion ^{:cost 2} [String byte-array] + [s {:keys [encoding] :or {encoding "UTF-8"}}] + (.getBytes s ^String (name encoding))) + + ;; byte-array => string + (def-conversion ^{:cost 2} [bytes String] + [ary {:keys [encoding] :or {encoding "UTF-8"}}] + (String. ^bytes ary (name encoding))) + + ;; lazy-seq of byte-buffers => channel + (def-conversion ^{:cost 1.5} [(seq-of ByteBuffer) ReadableByteChannel] + [bufs] + (let [pipe (Pipe/open) + ^WritableByteChannel sink (.sink pipe) + source (doto ^AbstractSelectableChannel (.source pipe) + (.configureBlocking true))] + (future + (try + (loop [s bufs] + (when (and (not (empty? s)) (.isOpen sink)) + (let [buf (.duplicate ^ByteBuffer (first s))] + (.write sink buf) + (recur (rest s))))) + (finally + (.close sink)))) + source)) + + (def-conversion ^{:cost 1.5} [(seq-of #'proto/ByteSource) InputStream] + [srcs options] + (let [chunk-size (get options :chunk-size 65536) + out (PipedOutputStream.) + in (PipedInputStream. out chunk-size)] + (future + (try + (loop [s srcs] + (when-not (empty? s) + (transfer (first s) out) + (recur (rest s)))) + (finally + (.close out)))) + in)) + + (def-conversion ^{:cost 1.5} [InputStream byte-array] + [in options] + (let [out (ByteArrayOutputStream. (p/max 64 (.available in))) + buf (Utils/byteArray 16384)] + (loop [] + (let [len (.read in buf 0 16384)] + (when-not (neg? len) + (.write out buf 0 len) + (recur)))) + (.toByteArray out))) + + #_(let [ary (Utils/byteArray 0)] + (def-conversion ^{:cost 0} [::nil byte-array] + [src options] + ary)) + + (def-conversion ^{:cost 2} [#'proto/ByteSource byte-array] + [src options] + (let [os (ByteArrayOutputStream.)] + (transfer src os) + (.toByteArray os))) + + ;; generic byte-source => lazy char-sequence + (def-conversion ^{:cost 2} [#'proto/ByteSource CharSequence] + [source options] + (cs/decode-byte-source + #(when-let [bytes (proto/take-bytes! source % options)] + (convert bytes ByteBuffer options)) + #(when (proto/closeable? source) + (proto/close source)) + options)) + + ;; input-stream => reader + (def-conversion ^{:cost 1.5} [InputStream Reader] + [input-stream {:keys [encoding] :or {encoding "UTF-8"}}] + (BufferedReader. (InputStreamReader. input-stream ^String encoding))) + + ;; reader => char-sequence + (def-conversion ^{:cost 1.5} [Reader CharSequence] + [reader {:keys [chunk-size] :or {chunk-size 2048}}] + (let [ary (char-array chunk-size) + sb (StringBuilder.)] + (loop [] + (let [n (.read reader ary 0 chunk-size)] + (if (pos? n) + (do + (.append sb ary 0 n) + (recur)) + (.toString sb)))))) + + ;; char-sequence => string + (def-conversion [CharSequence String] + [char-sequence] + (.toString char-sequence)) + + (def-conversion [(vector-of String) String] + [strings] + (let [sb (StringBuilder.)] + (doseq [s strings] + (.append sb s)) + (.toString sb))) + + ;; file => readable-channel + (def-conversion ^{:cost 0} [File ReadableByteChannel] + [file] + (.getChannel (FileInputStream. file))) + + ;; file => writable-channel + (def-conversion ^{:cost 0} [File WritableByteChannel] + [file {:keys [append?] :or {append? true}}] + (.getChannel (FileOutputStream. file (boolean append?)))) + + (def-conversion ^{:cost 0} [File (seq-of ByteBuffer)] + [file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}] + (let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r")) + ^FileChannel fc (.getChannel raf) + buf-seq (fn buf-seq [offset] + (when-not (<= (.size fc) offset) + (let [remaining (- (.size fc) offset)] + (lazy-seq + (cons + (.map fc + (if writable? + FileChannel$MapMode/READ_WRITE + FileChannel$MapMode/READ_ONLY) + offset + (min remaining chunk-size)) + (buf-seq (+ offset chunk-size)))))))] + (g/closeable-seq + (buf-seq 0) + false + #(do + (.close raf) + (.close fc))))) + + ;; output-stream => writable-channel + (def-conversion ^{:cost 0} [OutputStream WritableByteChannel] + [output-stream] + (Channels/newChannel output-stream)) + + ;; writable-channel => output-stream + (def-conversion ^{:cost 0} [WritableByteChannel OutputStream] + [channel] + (Channels/newOutputStream channel)))) ;;; def-transfers diff --git a/test/reload_test.clj b/test/reload_test.clj new file mode 100644 index 0000000..9d6afe6 --- /dev/null +++ b/test/reload_test.clj @@ -0,0 +1,7 @@ +(ns reload_test + (:require [byte-streams] + [clojure.test :refer :all])) + +(deftest test-reload + (dotimes [_ 5] + (use 'byte-streams :reload-all)))