From 440c64a0a02886706f071bc96b6ddbafb4a989f8 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Tue, 9 Jun 2026 17:44:45 +0200 Subject: [PATCH] Cache Arrow schema --- src/duckdb_py/pyresult.cpp | 197 +++++++----------- .../fast/arrow/test_475_geometry_crs_arrow.py | 63 ++++++ 2 files changed, 134 insertions(+), 126 deletions(-) create mode 100644 tests/fast/arrow/test_475_geometry_crs_arrow.py diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index d67ba420..3ae4e115 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -22,6 +22,7 @@ #include "duckdb_python/arrow/arrow_export_utils.hpp" #include "duckdb/main/chunk_scan_state/query_result.hpp" #include "duckdb/common/arrow/arrow_query_result.hpp" +#include "duckdb/common/arrow/nanoarrow/nanoarrow.hpp" using namespace pybind11::literals; @@ -428,25 +429,39 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo QueryResult::DeduplicateColumns(names); } - if (!result) { - throw InvalidInputException("result closed"); - } auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); + // If the producing operator cached the Arrow schema (built while its own + // transaction was still active), reuse it instead of rebuilding it here. + // Rebuilding post-commit breaks arrow type extensions whose schema callback + // does a catalog lookup -- e.g. GeoArrow CRS resolution -- which asserts an + // active transaction. See duckdb-python#475 / duckdb-spatial#788. + ArrowQueryResult *cached_result = nullptr; + if (result->type == QueryResultType::ARROW_RESULT) { + auto &arrow_result = result->Cast(); + if (arrow_result.HasCachedSchema()) { + cached_result = &arrow_result; + } + } + py::list batches; if (result->type == QueryResultType::ARROW_RESULT) { auto &arrow_result = result->Cast(); auto arrays = arrow_result.ConsumeArrays(); for (auto &array : arrays) { ArrowSchema arrow_schema; - auto result_names = arrow_result.names; - if (to_polars) { - QueryResult::DeduplicateColumns(result_names); - } ArrowArray data = array->arrow_array; array->arrow_array.release = nullptr; - ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names, - arrow_result.client_properties); + if (cached_result) { + cached_result->GetSchema(arrow_schema); + } else { + auto result_names = arrow_result.names; + if (to_polars) { + QueryResult::DeduplicateColumns(result_names); + } + ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names, + arrow_result.client_properties); + } TransformDuckToArrowChunk(arrow_schema, data, batches); } } else { @@ -476,7 +491,33 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo } } - return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties); + if (!cached_result) { + return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties); + } + + // Assemble the table from the cached schema (avoids the ToArrowSchema call + // inside pyarrow::ToArrowTable, which would also assert post-commit). + auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches"); + auto schema_import_func = pyarrow_lib_module.attr("Schema").attr("_import_from_c"); + ArrowSchema final_schema; + cached_result->GetSchema(final_schema); + auto schema_obj = schema_import_func(reinterpret_cast(&final_schema)); + auto table = py::cast(from_batches_func(batches, schema_obj)); + if (to_polars) { + // The cached schema carries the original column names; polars needs them + // unique. Rename only when there are real duplicates, so unique columns + // (and their field metadata, e.g. geoarrow) are left untouched. + auto deduped = result->names; + QueryResult::DeduplicateColumns(deduped); + if (deduped != result->names) { + py::list renamed; + for (auto &n : deduped) { + renamed.append(n); + } + table = py::cast(table.attr("rename_columns")(renamed)); + } + } + return table; } ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { @@ -501,119 +542,6 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t return py::cast(record_batch_reader); } -// Holds owned copies of the string data for a deep-copied ArrowSchema node. -struct ArrowSchemaCopyData { - string format; - string name; - string metadata; -}; - -static void ReleaseCopiedArrowSchema(ArrowSchema *schema) { - if (!schema || !schema->release) { - return; - } - for (int64_t i = 0; i < schema->n_children; i++) { - if (schema->children[i]->release) { - schema->children[i]->release(schema->children[i]); - } - delete schema->children[i]; - } - delete[] schema->children; - if (schema->dictionary) { - if (schema->dictionary->release) { - schema->dictionary->release(schema->dictionary); - } - delete schema->dictionary; - } - delete reinterpret_cast(schema->private_data); - schema->release = nullptr; -} - -static idx_t ArrowMetadataSize(const char *metadata) { - if (!metadata) { - return 0; - } - // Arrow metadata format: int32 num_entries, then for each entry: - // int32 key_len, key_bytes, int32 value_len, value_bytes - auto ptr = metadata; - int32_t num_entries; - memcpy(&num_entries, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t); - for (int32_t i = 0; i < num_entries; i++) { - int32_t len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - } - return ptr - metadata; -} - -// Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema -// transfers ownership to the caller, so each call must produce an independent copy. -// Each node owns its string data via an ArrowSchemaCopyData in private_data. -static int ArrowSchemaDeepCopy(const ArrowSchema &source, ArrowSchema *out, string &error) { - out->release = nullptr; - try { - auto data = new ArrowSchemaCopyData(); - data->format = source.format ? source.format : ""; - data->name = source.name ? source.name : ""; - if (source.metadata) { - auto metadata_size = ArrowMetadataSize(source.metadata); - data->metadata.assign(source.metadata, metadata_size); - } - - out->format = data->format.c_str(); - out->name = data->name.c_str(); - out->metadata = source.metadata ? data->metadata.data() : nullptr; - out->flags = source.flags; - out->n_children = source.n_children; - out->dictionary = nullptr; - out->private_data = data; - out->release = ReleaseCopiedArrowSchema; - - if (source.n_children > 0) { - out->children = new ArrowSchema *[source.n_children]; - for (int64_t i = 0; i < source.n_children; i++) { - out->children[i] = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.children[i], out->children[i], error); - if (rc != 0) { - for (int64_t j = 0; j <= i; j++) { - if (out->children[j]->release) { - out->children[j]->release(out->children[j]); - } - delete out->children[j]; - } - delete[] out->children; - out->children = nullptr; - out->n_children = 0; - // Release the partially constructed node - delete data; - out->private_data = nullptr; - out->release = nullptr; - return rc; - } - } - } else { - out->children = nullptr; - } - - if (source.dictionary) { - out->dictionary = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.dictionary, out->dictionary, error); - if (rc != 0) { - delete out->dictionary; - out->dictionary = nullptr; - return rc; - } - } - } catch (std::exception &e) { - error = e.what(); - return -1; - } - return 0; -} - // Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream. // This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper // with an ArrowQueryResult (which throws NotImplementedException from FetchInternal). @@ -628,7 +556,16 @@ struct ArrowQueryResultStreamWrapper { arrays = arrow_result.ConsumeArrays(); cached_schema.release = nullptr; - ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties); + if (arrow_result.HasCachedSchema()) { + // Reuse the schema the collector built under the producing transaction. + // Rebuilding it here (post-commit) would assert for arrow type + // extensions that do a catalog lookup, e.g. GeoArrow CRS -- this is the + // capsule / Arrow C Stream form of #475 (pa.table(rel), pl.DataFrame(rel), + // ADBC). See duckdb-python#475 / duckdb-spatial#788. + arrow_result.GetSchema(cached_schema); + } else { + ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties); + } stream.private_data = this; stream.get_schema = GetSchema; @@ -648,7 +585,11 @@ struct ArrowQueryResultStreamWrapper { return -1; } auto self = reinterpret_cast(stream->private_data); - return ArrowSchemaDeepCopy(self->cached_schema, out, self->last_error); + auto rc = duckdb_nanoarrow::ArrowSchemaDeepCopy(&self->cached_schema, out); + if (rc != NANOARROW_OK) { + self->last_error = "failed to copy cached Arrow schema"; + } + return rc; } static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { @@ -731,7 +672,11 @@ struct SchemaCachingStreamWrapper { if (!self->schema_ok) { return -1; } - return ArrowSchemaDeepCopy(self->cached_schema, out, self->schema_error); + auto rc = duckdb_nanoarrow::ArrowSchemaDeepCopy(&self->cached_schema, out); + if (rc != NANOARROW_OK) { + self->schema_error = "failed to copy cached Arrow schema"; + } + return rc; } static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { diff --git a/tests/fast/arrow/test_475_geometry_crs_arrow.py b/tests/fast/arrow/test_475_geometry_crs_arrow.py new file mode 100644 index 00000000..6ceb3736 --- /dev/null +++ b/tests/fast/arrow/test_475_geometry_crs_arrow.py @@ -0,0 +1,63 @@ +"""Regression test for duckdb-python#475 / duckdb-spatial#788. + +Converting a GEOMETRY column that carries a CRS to Arrow used to raise +``InternalException: TransactionContext::ActiveTransaction called without +active transaction``. The GeoArrow schema callback does a catalog lookup to +resolve the CRS, which needs an active transaction -- but the Arrow schema was +rebuilt at consume time, after the producing (auto-commit) transaction had +already closed. + +The fix builds and caches the schema on ArrowQueryResult while the producing +transaction is still active, and the consumers below reuse it. Each test +exercises one of those consumers. The geometry-with-CRS value is built with a +pure-core cast, so no spatial extension is required. +""" + +from __future__ import annotations + +import pytest + +import duckdb + +pa = pytest.importorskip("pyarrow") + +# An authority-code CRS forces the catalog lookup that used to require an open +# transaction. No spatial extension needed -- the cast and geoarrow.wkb mapping +# are both in core. +GEOM_SQL = "SELECT 'POINT(0 1)'::GEOMETRY('OGC:CRS84') AS g" + + +def _assert_geoarrow_with_crs(field: pa.Field) -> None: + metadata = field.metadata or {} + assert metadata.get(b"ARROW:extension:name") == b"geoarrow.wkb" + assert b"crs" in metadata.get(b"ARROW:extension:metadata", b"") + + +def test_475_to_arrow_table_geometry_with_crs(): + con = duckdb.connect() + table = con.sql(GEOM_SQL).to_arrow_table() + assert table.num_rows == 1 + _assert_geoarrow_with_crs(table.schema.field("g")) + + +def test_475_arrow_capsule_geometry_with_crs(): + # pa.table(rel) consumes via __arrow_c_stream__ (the capsule / ADBC path). + con = duckdb.connect() + table = pa.table(con.sql(GEOM_SQL)) + assert table.num_rows == 1 + _assert_geoarrow_with_crs(table.schema.field("g")) + + +def test_475_record_batch_reader_geometry_with_crs(): + con = duckdb.connect() + table = con.sql(GEOM_SQL).to_arrow_reader().read_all() + assert table.num_rows == 1 + _assert_geoarrow_with_crs(table.schema.field("g")) + + +def test_475_polars_geometry_with_crs(): + pl = pytest.importorskip("polars") + con = duckdb.connect() + # polars.DataFrame(rel) pulls the relation's Arrow C stream directly. + df = pl.DataFrame(con.sql(GEOM_SQL)) + assert df.height == 1