Skip to content

Commit d76f996

Browse files
committed
Add Cython BinaryEncoder for Avro block encoding
Mirrors the existing CythonBinaryDecoder. The pure-Python BinaryEncoder emits each varint byte via bytes([x]) and a stream write per primitive; the Cython implementation writes into a growable char* buffer with inlined zigzag encoding and memcpy, then materialises once via getvalue(). AvroOutputFile.write_block now uses new_memory_encoder() which returns the Cython implementation when the extension is built and falls back to a MemoryBinaryEncoder wrapper otherwise (same pattern as new_decoder()). Encoding 50k realistic ManifestEntry records (14 columns with full stats) goes from 1.64s to 0.36s (4.5x), byte-identical output. Tests are parametrised over both implementations and include int64-boundary round-trips and a byte-equivalence check.
1 parent e6d5129 commit d76f996

8 files changed

Lines changed: 251 additions & 58 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ htmlcov
4545
.ipynb_checkpoints/
4646

4747
pyiceberg/avro/decoder_fast.c
48+
pyiceberg/avro/encoder_fast.c
4849
pyiceberg/avro/*.html
4950
pyiceberg/avro/*.so

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ recursive-include pyiceberg *.pyx *.c
2020

2121
# Exclude generated Cython C file
2222
exclude pyiceberg/avro/decoder_fast.c
23+
exclude pyiceberg/avro/encoder_fast.c
2324

2425
# Include test files in sdist
2526
recursive-include tests *

pyiceberg/avro/encoder.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,29 @@ def write_uuid(self, uuid: UUID) -> None:
7878

7979
def write_unknown(self, _: Any) -> None:
8080
"""Nulls are written as 0 bytes in avro, so we do nothing."""
81+
82+
83+
class MemoryBinaryEncoder(BinaryEncoder):
84+
"""BinaryEncoder that writes to an owned in-memory buffer."""
85+
86+
def __init__(self) -> None:
87+
import io
88+
89+
self._buffer = io.BytesIO()
90+
super().__init__(self._buffer)
91+
92+
def getvalue(self) -> bytes:
93+
return self._buffer.getvalue()
94+
95+
96+
def new_memory_encoder() -> "CythonBinaryEncoder | MemoryBinaryEncoder": # type: ignore[name-defined] # noqa: F821
97+
try:
98+
from pyiceberg.avro.encoder_fast import CythonBinaryEncoder
99+
100+
return CythonBinaryEncoder()
101+
except ModuleNotFoundError:
102+
import warnings
103+
104+
warnings.warn("Falling back to pure Python Avro encoder, missing Cython implementation", stacklevel=2)
105+
106+
return MemoryBinaryEncoder()

pyiceberg/avro/encoder_fast.pyi

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import Any
18+
from uuid import UUID
19+
20+
class CythonBinaryEncoder:
21+
def __init__(self) -> None: ...
22+
def getvalue(self) -> bytes: ...
23+
def write(self, b: bytes) -> None: ...
24+
def write_boolean(self, v: bool) -> None: ...
25+
def write_int(self, v: int) -> None: ...
26+
def write_float(self, v: float) -> None: ...
27+
def write_double(self, v: float) -> None: ...
28+
def write_bytes(self, b: bytes) -> None: ...
29+
def write_utf8(self, s: str) -> None: ...
30+
def write_uuid(self, uuid: UUID) -> None: ...
31+
def write_unknown(self, _: Any) -> None: ...

pyiceberg/avro/encoder_fast.pyx

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import cython
18+
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING, PyBytes_GET_SIZE
19+
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
20+
from libc.string cimport memcpy
21+
from libc.stdint cimport uint8_t, uint64_t, int64_t
22+
23+
from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
24+
25+
cdef uint64_t _INITIAL_CAPACITY = 1024
26+
27+
28+
@cython.final
29+
cdef class CythonBinaryEncoder:
30+
"""In-memory BinaryEncoder that writes to a growable C buffer.
31+
32+
Drop-in replacement for BinaryEncoder for the block-encoding path:
33+
exposes the same write_* methods the Writer tree calls, plus
34+
getvalue() to materialise the encoded bytes once at the end.
35+
"""
36+
37+
cdef unsigned char *_data
38+
cdef uint64_t _size
39+
cdef uint64_t _capacity
40+
41+
def __cinit__(self):
42+
self._data = <unsigned char *> PyMem_Malloc(_INITIAL_CAPACITY)
43+
if not self._data:
44+
raise MemoryError()
45+
self._size = 0
46+
self._capacity = _INITIAL_CAPACITY
47+
48+
def __dealloc__(self):
49+
PyMem_Free(self._data)
50+
51+
cdef inline int _ensure(self, uint64_t n) except -1:
52+
cdef uint64_t need = self._size + n
53+
if need <= self._capacity:
54+
return 0
55+
cdef uint64_t cap = self._capacity
56+
while cap < need:
57+
cap <<= 1
58+
cdef unsigned char *grown = <unsigned char *> PyMem_Realloc(self._data, cap)
59+
if not grown:
60+
raise MemoryError()
61+
self._data = grown
62+
self._capacity = cap
63+
return 0
64+
65+
cpdef bytes getvalue(self):
66+
return PyBytes_FromStringAndSize(<char *> self._data, self._size)
67+
68+
cpdef void write(self, bytes b):
69+
cdef Py_ssize_t n = PyBytes_GET_SIZE(b)
70+
self._ensure(n)
71+
memcpy(self._data + self._size, PyBytes_AS_STRING(b), n)
72+
self._size += n
73+
74+
cpdef void write_boolean(self, bint v):
75+
self._ensure(1)
76+
self._data[self._size] = 1 if v else 0
77+
self._size += 1
78+
79+
cpdef void write_int(self, int64_t v):
80+
# zigzag then base-128 varint; a 64-bit value needs at most 10 bytes
81+
self._ensure(10)
82+
cdef uint64_t uv = <uint64_t> v
83+
cdef uint64_t datum = (uv << 1) ^ (0 - (uv >> 63))
84+
cdef unsigned char *p = self._data + self._size
85+
while datum & <uint64_t> ~0x7F:
86+
p[0] = <uint8_t> ((datum & 0x7F) | 0x80)
87+
p += 1
88+
datum >>= 7
89+
p[0] = <uint8_t> datum
90+
p += 1
91+
self._size = p - self._data
92+
93+
def write_float(self, v: float) -> None:
94+
self.write(STRUCT_FLOAT.pack(v))
95+
96+
def write_double(self, v: float) -> None:
97+
self.write(STRUCT_DOUBLE.pack(v))
98+
99+
cpdef void write_bytes(self, b):
100+
cdef bytes bb = bytes(b) if type(b) is not bytes else b
101+
cdef Py_ssize_t n = PyBytes_GET_SIZE(bb)
102+
self.write_int(n)
103+
self._ensure(n)
104+
memcpy(self._data + self._size, PyBytes_AS_STRING(bb), n)
105+
self._size += n
106+
107+
def write_utf8(self, s) -> None:
108+
self.write_bytes(s.encode("utf-8"))
109+
110+
def write_uuid(self, uuid) -> None:
111+
cdef bytes b = uuid.bytes
112+
if PyBytes_GET_SIZE(b) != 16:
113+
raise ValueError(f"Expected UUID to have 16 bytes, got: len({b!r})")
114+
self._ensure(16)
115+
memcpy(self._data + self._size, PyBytes_AS_STRING(b), 16)
116+
self._size += 16
117+
118+
def write_unknown(self, _) -> None:
119+
pass

pyiceberg/avro/file.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
from __future__ import annotations
2121

22-
import io
2322
import json
2423
import os
2524
from collections.abc import Callable
@@ -34,7 +33,7 @@
3433
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
3534
from pyiceberg.avro.codecs.codec import Codec
3635
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
37-
from pyiceberg.avro.encoder import BinaryEncoder
36+
from pyiceberg.avro.encoder import BinaryEncoder, new_memory_encoder
3837
from pyiceberg.avro.reader import Reader
3938
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve_reader, resolve_writer
4039
from pyiceberg.avro.writer import Writer
@@ -300,11 +299,10 @@ def compression_codec(self) -> type[Codec] | None:
300299
return KNOWN_CODECS[codec_name] # type: ignore
301300

302301
def write_block(self, objects: list[D]) -> None:
303-
in_memory = io.BytesIO()
304-
block_content_encoder = BinaryEncoder(output_stream=in_memory)
302+
block_content_encoder = new_memory_encoder()
305303
for obj in objects:
306304
self.writer.write(block_content_encoder, obj)
307-
block_content = in_memory.getvalue()
305+
block_content = block_content_encoder.getvalue()
308306

309307
self.encoder.write_int(len(objects))
310308

setup.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ def make_release_tree(self, base_dir: str, files: list[str]) -> None:
5959
[os.path.join(package_path, "avro", "decoder_fast.pyx")],
6060
extra_compile_args=extra_compile_args,
6161
language="c",
62-
)
62+
),
63+
Extension(
64+
"pyiceberg.avro.encoder_fast",
65+
[os.path.join(package_path, "avro", "encoder_fast.pyx")],
66+
extra_compile_args=extra_compile_args,
67+
language="c",
68+
),
6369
]
6470

6571
ext_modules = cythonize(

0 commit comments

Comments
 (0)