|
43 | 43 |
|
44 | 44 | (defprotocol ByteSource
|
45 | 45 | (take-bytes! [_ n options] "Takes `n` bytes from the byte source."))
|
46 |
| - |
| 46 | + |
47 | 47 | (defprotocol ByteSink
|
48 | 48 | (send-bytes! [_ bytes options] "Puts `bytes` in the byte sink."))
|
49 | 49 |
|
|
146 | 146 | (cond
|
147 | 147 | (and (class? a) (class? b))
|
148 | 148 | (.isAssignableFrom ^Class b a)
|
149 |
| - |
| 149 | + |
150 | 150 | (and (protocol? b) (class? a))
|
151 | 151 | (class-satisfies? b a)
|
152 |
| - |
| 152 | + |
153 | 153 | (and (seq-of? a) (seq-of? b))
|
154 | 154 | (assignable? (second a) (second b))
|
155 |
| - |
| 155 | + |
156 | 156 | :else
|
157 | 157 | (= a b))))
|
158 | 158 |
|
|
221 | 221 | java.io.Closeable
|
222 | 222 | (close [_]
|
223 | 223 | (close-fn))
|
224 |
| - |
| 224 | + |
225 | 225 | clojure.lang.Sequential
|
226 | 226 | clojure.lang.ISeq
|
227 | 227 | clojure.lang.Seqable
|
|
273 | 273 | (fn [[a b :as a+b]]
|
274 | 274 | (if-let [f (get-in @src->dst->conversion a+b)]
|
275 | 275 | f
|
276 |
| - |
| 276 | + |
277 | 277 | ;; implicit (seq-of a) -> (seq-of b) conversion
|
278 | 278 | (if-let [f (when (every? seq-of? a+b)
|
279 | 279 | (get-in @src->dst->conversion (map second a+b)))]
|
280 | 280 | (fn [x options]
|
281 | 281 | (map #(f % options) x))
|
282 |
| - |
| 282 | + |
283 | 283 | ;; this shouldn't ever happen, but let's have a decent error message all the same
|
284 | 284 | (throw
|
285 | 285 | (IllegalStateException.
|
|
289 | 289 | (let [close-fns (atom (tuple))
|
290 | 290 | result (reduce
|
291 | 291 | (fn [x f]
|
292 |
| - |
| 292 | + |
293 | 293 | ;; keep track of everything that needs to be closed once the bytes are exhausted
|
294 | 294 | (when (closeable? x)
|
295 | 295 | (swap! close-fns conj #(close x)))
|
|
298 | 298 | fns)]
|
299 | 299 | (if-let [close-fn (when-let [fns (seq @close-fns)]
|
300 | 300 | #(doseq [f fns]
|
301 |
| - (f)))] |
| 301 | + (f)))] |
302 | 302 | (if (sequential? result)
|
303 | 303 | (closeable-seq result true close-fn)
|
304 | 304 | (do
|
|
392 | 392 | ^long [x dst]
|
393 | 393 | (memoized-cost (type-descriptor x) dst)))
|
394 | 394 |
|
| 395 | +(defn precache-conversions |
| 396 | + "Walk the graph of conversions, making all subsequent conversions reliably fast." |
| 397 | + [] |
| 398 | + (->> @src->dst->conversion |
| 399 | + (mapcat #(map list (repeat %) (possible-conversions %))) |
| 400 | + distinct |
| 401 | + (map #(apply conversion-path %)) |
| 402 | + dorun)) |
| 403 | + |
395 | 404 | ;;; transfer
|
396 | 405 |
|
397 | 406 | (defn- default-transfer [source sink {:keys [chunk-size] :or {chunk-size 1024} :as options}]
|
|
416 | 425 | first
|
417 | 426 | second)]
|
418 | 427 | (cond
|
419 |
| - |
| 428 | + |
420 | 429 | (and src' dst')
|
421 | 430 | (let [f (get-in @src->dst->transfer [src' dst'])]
|
422 | 431 | (fn [source sink options]
|
423 |
| - (let [source' (convert source src') |
424 |
| - sink' (convert sink dst')] |
| 432 | + (let [source' (convert source src' options) |
| 433 | + sink' (convert sink dst' options)] |
425 | 434 | (f source' sink' options)
|
426 | 435 | (doseq [x [source sink source' sink']]
|
427 | 436 | (when (closeable? x)
|
428 | 437 | (close x))))))
|
429 |
| - |
| 438 | + |
430 | 439 | (and
|
431 | 440 | (conversion-path src ByteSource)
|
432 | 441 | (conversion-path dst ByteSink))
|
433 | 442 | (fn [source sink options]
|
434 |
| - (let [source' (convert source ByteSource) |
435 |
| - sink' (convert sink ByteSink)] |
| 443 | + (let [source' (convert source ByteSource options) |
| 444 | + sink' (convert sink ByteSink options)] |
436 | 445 | (default-transfer source' sink' options)
|
437 | 446 | (doseq [x [source sink source' sink']]
|
438 | 447 | (when (closeable? x)
|
439 | 448 | (close x)))))
|
440 |
| - |
| 449 | + |
441 | 450 | :else
|
442 | 451 | nil)))))
|
443 | 452 |
|
|
589 | 598 | (let [^ByteBuffer buf (first s)]
|
590 | 599 | (.mark buf)
|
591 | 600 | (.write sink buf)
|
592 |
| - (.reset buf) |
| 601 | + (.reset buf) |
593 | 602 | (recur (rest s)))))
|
594 | 603 | (.close sink))
|
595 | 604 | source))
|
|
604 | 613 | (close source))
|
605 | 614 | options))
|
606 | 615 |
|
607 |
| -;; input-stream => reader |
| 616 | +;; input-stream => reader |
608 | 617 | (def-conversion [InputStream Reader]
|
609 | 618 | [input-stream {:keys [encoding] :or {encoding "utf-8"}}]
|
610 | 619 | (BufferedReader. (InputStreamReader. input-stream ^String encoding)))
|
|
654 | 663 | (let [remaining (- (.size fc) offset)]
|
655 | 664 | (lazy-seq
|
656 | 665 | (cons
|
657 |
| - (.map fc |
| 666 | + (.map fc |
658 | 667 | (if writable?
|
659 | 668 | FileChannel$MapMode/READ_WRITE
|
660 | 669 | FileChannel$MapMode/READ_ONLY)
|
|
696 | 705 | (let [^FileChannel fc (convert file ReadableByteChannel options)]
|
697 | 706 | (try
|
698 | 707 | (loop [idx 0]
|
699 |
| - (let [n (.transferTo fc channel idx chunk-size)] |
| 708 | + (let [n (.transferTo fc idx chunk-size channel)] |
700 | 709 | (when (pos? n)
|
701 | 710 | (recur (+ idx n)))))
|
702 | 711 | (finally
|
|
705 | 714 | (def-transfer [InputStream OutputStream]
|
706 | 715 | [input-stream output-stream {:keys [chunk-size] :or {chunk-size 4096} :as options}]
|
707 | 716 | (let [ary (clojure.core/byte-array chunk-size)]
|
708 |
| - (loop [] |
709 |
| - (let [n (.read ^InputStream input-stream ary)] |
710 |
| - (when (pos? n) |
711 |
| - (.write ^OutputStream output-stream ary 0 n) |
712 |
| - (.flush ^OutputStream output-stream) |
713 |
| - (recur)))))) |
| 717 | + (try |
| 718 | + (loop [] |
| 719 | + (let [n (.read ^InputStream input-stream ary)] |
| 720 | + (when (pos? n) |
| 721 | + (.write ^OutputStream output-stream ary 0 n) |
| 722 | + (.flush ^OutputStream output-stream) |
| 723 | + (recur)))) |
| 724 | + (finally |
| 725 | + (.close ^OutputStream output-stream))))) |
714 | 726 |
|
715 | 727 | ;;; protocol extensions
|
716 | 728 |
|
|
0 commit comments