Skip to content

Commit 9acda9f

Browse files
committed
Allowing input stream pathway to avoid any native allocations.
1 parent effa353 commit 9acda9f

1 file changed

Lines changed: 119 additions & 29 deletions

File tree

src/tech/v3/libs/arrow.clj

Lines changed: 119 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
[ham-fisted.function :as hamf-fn]
110110
[ham-fisted.set :as set]
111111
[ham-fisted.protocols :as hamf-proto])
112-
(:import [ham_fisted ArrayLists]
112+
(:import [ham_fisted ArrayLists ArrayLists$ArrayOwner]
113113
[org.apache.arrow.vector.ipc.message MessageSerializer]
114114
[org.apache.arrow.flatbuf Message DictionaryBatch RecordBatch
115115
FieldNode Buffer BodyCompression BodyCompressionMethod Footer Block]
@@ -236,9 +236,12 @@
236236
(defn- create-zstd-decompressor
237237
[]
238238
(fn [compbuf resbuf]
239-
(Zstd/decompress ^ByteBuffer (nio-buffer/as-nio-buffer resbuf)
240-
^ByteBuffer (nio-buffer/as-nio-buffer compbuf))
241-
resbuf))
239+
(if (instance? NativeBuffer compbuf)
240+
(Zstd/decompress ^ByteBuffer (nio-buffer/as-nio-buffer resbuf)
241+
^ByteBuffer (nio-buffer/as-nio-buffer compbuf))
242+
(let [dst (byte-array (dtype/ecount resbuf))]
243+
(Zstd/decompress dst (dtype/->byte-array compbuf))
244+
(dtype/copy! dst resbuf)))))
242245

243246

244247
#_(defn- create-apache-lz4-frame-compressor
@@ -543,7 +546,7 @@ Dependent block frames are not supported!!")
543546
body-len (.bodyLength new-msg)
544547
pad-body-len (pad body-len)
545548
nbuf (when-not (== 0 body-len)
546-
(dtype/make-container :native-heap :int8 pad-body-len))
549+
(dtype/make-container :int8 pad-body-len))
547550
read-chunk-size (* 1024 1024 100)
548551
read-buffer (byte-array (min body-len read-chunk-size))
549552
bytes-read (when nbuf
@@ -824,9 +827,12 @@ Dependent block frames are not supported!!")
824827
(throw (Exception. (format "Only buffer batch compression supported - got %d"
825828
(.method compression)))))
826829
(let [decompressor (create-decompressor (.codec compression))
830+
buf-type (if (instance? NativeBuffer (hamf/first buffers))
831+
:native-heap
832+
:jvm-heap)
827833
buffers (mapv (fn [buffer]
828834
(if (> (dtype/ecount buffer) 8)
829-
(let [orig-len (native-buffer/read-long buffer)]
835+
(let [orig-len (read-long buffer)]
830836
{:orig-len orig-len
831837
:buffer (dtype/sub-buffer buffer 8)})
832838
{:orig-len (dtype/ecount buffer)
@@ -837,7 +843,7 @@ Dependent block frames are not supported!!")
837843
(lznc/remove #(== -1 (long %)))
838844
(reduce + 0)
839845
(long))
840-
decomp-buf (dtype/make-container :native-heap :int8 decomp-buf-len)]
846+
decomp-buf (dtype/make-container buf-type :int8 decomp-buf-len)]
841847
(->> buffers
842848
(reduce (fn [[res decomp-buf] {:keys [orig-len buffer]}]
843849
;;-1 indicates the buffer isn't actually compressed.
@@ -913,6 +919,91 @@ Dependent block frames are not supported!!")
913919
(dtype/set-constant! c -1)
914920
c))
915921

922+
(defn- LE-wrap-data
923+
^java.nio.ByteBuffer [buffer]
924+
(let [^java.nio.ByteBuffer bbuf (nio-buffer/->nio-buffer buffer)]
925+
(.order bbuf java.nio.ByteOrder/LITTLE_ENDIAN)))
926+
927+
(defn- as-shorts
928+
^shorts [abuf]
929+
(let [bbuf (LE-wrap-data abuf)
930+
rv (short-array (quot (dtype/ecount abuf) 2))]
931+
(-> (.asShortBuffer bbuf)
932+
(.get rv))
933+
rv))
934+
935+
(defn- as-ints
936+
^ints [abuf]
937+
(let [bbuf (LE-wrap-data abuf)
938+
rv (int-array (quot (dtype/ecount abuf) 4))]
939+
(-> (.asIntBuffer bbuf)
940+
(.get rv))
941+
rv))
942+
943+
(defn- as-longs
944+
^longs [abuf]
945+
(let [bbuf (LE-wrap-data abuf)
946+
rv (long-array (quot (dtype/ecount abuf) 8))]
947+
(-> (.asLongBuffer bbuf)
948+
(.get rv))
949+
rv))
950+
951+
(defn- as-floats
952+
^floats [abuf]
953+
(let [bbuf (LE-wrap-data abuf)
954+
rv (float-array (quot (dtype/ecount abuf) 4))]
955+
(-> (.asFloatBuffer bbuf)
956+
(.get rv))
957+
rv))
958+
959+
(defn- as-doubles
960+
^doubles [abuf]
961+
(let [bbuf (LE-wrap-data abuf)
962+
rv (double-array (quot (dtype/ecount abuf) 8))]
963+
(-> (.asDoubleBuffer bbuf)
964+
(.get rv))
965+
rv))
966+
967+
(defn read-long
968+
^long [buffer]
969+
(if (instance? NativeBuffer buffer)
970+
(native-buffer/read-long buffer)
971+
(-> (LE-wrap-data buffer)
972+
(.getLong))))
973+
974+
975+
976+
977+
(defn- set-buffer-datatype
978+
[buffer dtype]
979+
(if (instance? tech.v3.datatype.native_buffer.NativeBuffer buffer)
980+
(native-buffer/set-native-datatype buffer dtype)
981+
(let [abuf (dtype/->array-buffer buffer)]
982+
(let [offset (.-offset abuf)
983+
n-elems (.-n-elems abuf)
984+
src-data (.-ary-data abuf)]
985+
(case (casting/host-flatten dtype)
986+
(:int8 :uint8) (ArrayBuffer. src-data offset n-elems dtype nil nil)
987+
(:int16 :uint16) (ArrayBuffer. (as-shorts abuf) 0 (quot n-elems 2) dtype nil nil)
988+
(:int32 :uint32) (ArrayBuffer. (as-ints abuf) 0 (quot n-elems 4) dtype nil nil)
989+
(:int64 :uint64) (ArrayBuffer. (as-longs abuf) 0 (quot n-elems 8) dtype nil nil)
990+
:float32 (ArrayBuffer. (as-floats abuf) 0 (quot n-elems 4) dtype nil nil)
991+
:float64 (ArrayBuffer. (as-doubles abuf) 0 (quot n-elems 8) dtype nil nil))))))
992+
993+
(defn- ->jvm-array
994+
[buffer ^long off ^long len]
995+
(if (instance? NativeBuffer buffer)
996+
(native-buffer/->jvm-array buffer off len)
997+
(let [abuf (dtype/->array-buffer buffer)
998+
ary-data (.-ary-data abuf)
999+
offset (.-offset abuf)
1000+
n-elems (.-n-elems abuf)]
1001+
(if (and (== (.-offset abuf) off)
1002+
(== (.-n-elems abuf) len))
1003+
(.-ary-data abuf)
1004+
(let [^ArrayLists$ArrayOwner owner (ArrayLists/toList ary-data)]
1005+
(.copyOfRange owner offset (+ offset n-elems)))))))
1006+
9161007
(defn- byte-length
9171008
^long [container]
9181009
(* (dtype/ecount container) (casting/numeric-byte-width
@@ -1262,21 +1353,21 @@ Dependent block frames are not supported!!")
12621353
^List [offsets data n-elems]
12631354
(let [n-elems (long n-elems)
12641355
offsets (dtype/->reader offsets)]
1265-
(if (instance? NativeBuffer data)
1266-
(dtype/make-reader-fn
1267-
:string :string n-elems
1268-
(if (instance? NativeBuffer data)
1269-
(fn [^long idx]
1270-
(let [start-off (long (offsets idx))
1271-
end-off (long (offsets (inc idx)))]
1272-
(native-buffer/native-buffer->string data start-off (- end-off start-off))))
1356+
(dtype/make-reader-fn
1357+
:string :string n-elems
1358+
(if (instance? NativeBuffer data)
1359+
(fn [^long idx]
1360+
(let [start-off (long (offsets idx))
1361+
end-off (long (offsets (inc idx)))]
1362+
(native-buffer/native-buffer->string data start-off (- end-off start-off))))
1363+
(let [abuf (dtype/->array-buffer data)
1364+
^bytes src-data (.-ary-data abuf)
1365+
data-off (.-offset abuf)
1366+
]
12731367
(fn [^long idx]
1274-
(let [start-off (long (offsets idx))
1275-
end-off (long (offsets (inc idx)))]
1276-
(-> (dtype/sub-buffer data start-off
1277-
(- end-off start-off))
1278-
(dtype/->byte-array)
1279-
(String.)))))))))
1368+
(let [start-off (.readLong offsets idx)
1369+
end-off (.readLong offsets (inc idx))]
1370+
(String. src-data (+ start-off data-off) (- end-off start-off)))))))))
12801371

12811372
(defn- offsets-data->bytedata-reader
12821373
^List [offsets data n-elems]
@@ -1303,9 +1394,9 @@ Dependent block frames are not supported!!")
13031394
node (first nodes)
13041395
[_bitwise offsets databuf] buffers
13051396
n-elems (long (:n-elems node))
1306-
offsets (-> (native-buffer/set-native-datatype offsets :int32)
1397+
offsets (-> (set-buffer-datatype offsets :int32)
13071398
(dtype/sub-buffer 0 (inc n-elems)))
1308-
data (native-buffer/set-native-datatype databuf :int8)
1399+
data (set-buffer-datatype databuf :int8)
13091400
str-data (dtype/make-list :string (offsets-data->string-reader offsets data n-elems))]
13101401
{:id id
13111402
:delta? delta?
@@ -1332,12 +1423,11 @@ Dependent block frames are not supported!!")
13321423
(get :strings))
13331424
nil
13341425
(-> (first buffers)
1335-
(native-buffer/set-native-datatype
1336-
(get-in encoding [:index-type :datatype]))
1337-
(native-buffer/->jvm-array 0 n-elems)
1426+
(set-buffer-datatype (get-in encoding [:index-type :datatype]))
1427+
(->jvm-array 0 n-elems)
13381428
(ArrayLists/toList)))
13391429
(let [[offsets varchar-data] buffers
1340-
str-rdr (offsets-data->string-reader (native-buffer/set-native-datatype
1430+
str-rdr (offsets-data->string-reader (set-buffer-datatype
13411431
offsets offset-buf-dtype)
13421432
varchar-data n-elems)]
13431433
(if-not (:text-as-strings? options)
@@ -1503,7 +1593,7 @@ Dependent block frames are not supported!!")
15031593
[validity-buf offset-buf] (decompressor buffers)
15041594
n-elems (long (:n-elems node))
15051595
;;For lists, int32 offsets with one extra if this works like the string buffers
1506-
offsets (-> (native-buffer/set-native-datatype
1596+
offsets (-> (set-buffer-datatype
15071597
offset-buf :int32)
15081598
(dtype/sub-buffer 0 (inc n-elems))
15091599
(dtype/->buffer))]
@@ -1565,7 +1655,7 @@ Dependent block frames are not supported!!")
15651655
(let [[validity-buf data-buf] (decompressor buffers)]
15661656
(col-impl/new-column
15671657
(:name field)
1568-
(-> (native-buffer/set-native-datatype data-buf field-dtype)
1658+
(-> (set-buffer-datatype data-buf field-dtype)
15691659
(dtype/sub-buffer 0 n-elems))
15701660
(field-metadata field)
15711661
(node-buf->missing node validity-buf))))))

0 commit comments

Comments
 (0)