|
| 1 | +(ns tech.v3.libs.arrow.jpnz-lz4 |
| 2 | + (:require [tech.v3.datatype :as dtype] |
| 3 | + [clojure.tools.logging :as log]) |
| 4 | + (:import [tech.v3.datatype.array_buffer ArrayBuffer] |
| 5 | + [java.io ByteArrayOutputStream ByteArrayInputStream])) |
| 6 | + |
| 7 | +(defn- ensure-bytes-array-buffer |
| 8 | + ^ArrayBuffer [data] |
| 9 | + (if-let [ary-buf (dtype/as-array-buffer data)] |
| 10 | + (if (= :int8 (dtype/elemwise-datatype ary-buf)) |
| 11 | + ary-buf |
| 12 | + (dtype/make-container :int8 data)))) |
| 13 | + |
| 14 | +(defn create-jpnz-lz4-frame-compressor |
| 15 | + [comp-map] |
| 16 | + (assert (= :lz4 (get comp-map :compression-type))) |
| 17 | + (fn [compbuf dstbuf] |
| 18 | + (let [^ByteArrayOutputStream dstbuf (or dstbuf (java.io.ByteArrayOutputStream.)) |
| 19 | + os (net.jpountz.lz4.LZ4FrameOutputStream. dstbuf) |
| 20 | + srcbuf (ensure-bytes-array-buffer compbuf) |
| 21 | + ^bytes src-data (.ary-data srcbuf)] |
| 22 | + (.write os src-data (unchecked-int (.offset srcbuf)) (unchecked-int (.n-elems srcbuf))) |
| 23 | + (.close os) |
| 24 | + (let [final-bytes (.toByteArray dstbuf)] |
| 25 | + (.reset dstbuf) |
| 26 | + {:writer-cache dstbuf |
| 27 | + :dst-buffer final-bytes})))) |
| 28 | + |
| 29 | +(defn create-jpnz-lz4-decompressor |
| 30 | + [] |
| 31 | + (log/warn "Unable to load native lz4 library, falling back to jpountz. |
| 32 | +Dependent block frames are not supported!!") |
| 33 | + (fn [srcbuf dstbuf] |
| 34 | + (let [src-byte-data (dtype/->byte-array srcbuf) |
| 35 | + bis (ByteArrayInputStream. src-byte-data) |
| 36 | + is (net.jpountz.lz4.LZ4FrameInputStream. bis) |
| 37 | + temp-dstbuf (byte-array (dtype/ecount dstbuf))] |
| 38 | + (.read is temp-dstbuf) |
| 39 | + (dtype/copy! temp-dstbuf dstbuf)))) |
0 commit comments