Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 71 additions & 126 deletions src/duckdb_py/pyresult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "duckdb_python/arrow/arrow_export_utils.hpp"
#include "duckdb/main/chunk_scan_state/query_result.hpp"
#include "duckdb/common/arrow/arrow_query_result.hpp"
#include "duckdb/common/arrow/nanoarrow/nanoarrow.hpp"

using namespace pybind11::literals;

Expand Down Expand Up @@ -428,25 +429,39 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo
QueryResult::DeduplicateColumns(names);
}

if (!result) {
throw InvalidInputException("result closed");
}
auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib");

// If the producing operator cached the Arrow schema (built while its own
// transaction was still active), reuse it instead of rebuilding it here.
// Rebuilding post-commit breaks arrow type extensions whose schema callback
// does a catalog lookup -- e.g. GeoArrow CRS resolution -- which asserts an
// active transaction. See duckdb-python#475 / duckdb-spatial#788.
ArrowQueryResult *cached_result = nullptr;
if (result->type == QueryResultType::ARROW_RESULT) {
auto &arrow_result = result->Cast<ArrowQueryResult>();
if (arrow_result.HasCachedSchema()) {
cached_result = &arrow_result;
}
}

py::list batches;
if (result->type == QueryResultType::ARROW_RESULT) {
auto &arrow_result = result->Cast<ArrowQueryResult>();
auto arrays = arrow_result.ConsumeArrays();
for (auto &array : arrays) {
ArrowSchema arrow_schema;
auto result_names = arrow_result.names;
if (to_polars) {
QueryResult::DeduplicateColumns(result_names);
}
ArrowArray data = array->arrow_array;
array->arrow_array.release = nullptr;
ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names,
arrow_result.client_properties);
if (cached_result) {
cached_result->GetSchema(arrow_schema);
} else {
auto result_names = arrow_result.names;
if (to_polars) {
QueryResult::DeduplicateColumns(result_names);
}
ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names,
arrow_result.client_properties);
}
TransformDuckToArrowChunk(arrow_schema, data, batches);
}
} else {
Expand Down Expand Up @@ -476,7 +491,33 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo
}
}

return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties);
if (!cached_result) {
return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties);
}

// Assemble the table from the cached schema (avoids the ToArrowSchema call
// inside pyarrow::ToArrowTable, which would also assert post-commit).
auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches");
auto schema_import_func = pyarrow_lib_module.attr("Schema").attr("_import_from_c");
ArrowSchema final_schema;
cached_result->GetSchema(final_schema);
auto schema_obj = schema_import_func(reinterpret_cast<uint64_t>(&final_schema));
auto table = py::cast<duckdb::pyarrow::Table>(from_batches_func(batches, schema_obj));
if (to_polars) {
// The cached schema carries the original column names; polars needs them
// unique. Rename only when there are real duplicates, so unique columns
// (and their field metadata, e.g. geoarrow) are left untouched.
auto deduped = result->names;
QueryResult::DeduplicateColumns(deduped);
if (deduped != result->names) {
py::list renamed;
for (auto &n : deduped) {
renamed.append(n);
}
table = py::cast<duckdb::pyarrow::Table>(table.attr("rename_columns")(renamed));
}
}
return table;
}

ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) {
Expand All @@ -501,119 +542,6 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t
return py::cast<duckdb::pyarrow::RecordBatchReader>(record_batch_reader);
}

// Holds owned copies of the string data for a deep-copied ArrowSchema node.
struct ArrowSchemaCopyData {
string format;
string name;
string metadata;
};

static void ReleaseCopiedArrowSchema(ArrowSchema *schema) {
if (!schema || !schema->release) {
return;
}
for (int64_t i = 0; i < schema->n_children; i++) {
if (schema->children[i]->release) {
schema->children[i]->release(schema->children[i]);
}
delete schema->children[i];
}
delete[] schema->children;
if (schema->dictionary) {
if (schema->dictionary->release) {
schema->dictionary->release(schema->dictionary);
}
delete schema->dictionary;
}
delete reinterpret_cast<ArrowSchemaCopyData *>(schema->private_data);
schema->release = nullptr;
}

static idx_t ArrowMetadataSize(const char *metadata) {
if (!metadata) {
return 0;
}
// Arrow metadata format: int32 num_entries, then for each entry:
// int32 key_len, key_bytes, int32 value_len, value_bytes
auto ptr = metadata;
int32_t num_entries;
memcpy(&num_entries, ptr, sizeof(int32_t));
ptr += sizeof(int32_t);
for (int32_t i = 0; i < num_entries; i++) {
int32_t len;
memcpy(&len, ptr, sizeof(int32_t));
ptr += sizeof(int32_t) + len;
memcpy(&len, ptr, sizeof(int32_t));
ptr += sizeof(int32_t) + len;
}
return ptr - metadata;
}

// Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema
// transfers ownership to the caller, so each call must produce an independent copy.
// Each node owns its string data via an ArrowSchemaCopyData in private_data.
static int ArrowSchemaDeepCopy(const ArrowSchema &source, ArrowSchema *out, string &error) {
out->release = nullptr;
try {
auto data = new ArrowSchemaCopyData();
data->format = source.format ? source.format : "";
data->name = source.name ? source.name : "";
if (source.metadata) {
auto metadata_size = ArrowMetadataSize(source.metadata);
data->metadata.assign(source.metadata, metadata_size);
}

out->format = data->format.c_str();
out->name = data->name.c_str();
out->metadata = source.metadata ? data->metadata.data() : nullptr;
out->flags = source.flags;
out->n_children = source.n_children;
out->dictionary = nullptr;
out->private_data = data;
out->release = ReleaseCopiedArrowSchema;

if (source.n_children > 0) {
out->children = new ArrowSchema *[source.n_children];
for (int64_t i = 0; i < source.n_children; i++) {
out->children[i] = new ArrowSchema();
auto rc = ArrowSchemaDeepCopy(*source.children[i], out->children[i], error);
if (rc != 0) {
for (int64_t j = 0; j <= i; j++) {
if (out->children[j]->release) {
out->children[j]->release(out->children[j]);
}
delete out->children[j];
}
delete[] out->children;
out->children = nullptr;
out->n_children = 0;
// Release the partially constructed node
delete data;
out->private_data = nullptr;
out->release = nullptr;
return rc;
}
}
} else {
out->children = nullptr;
}

if (source.dictionary) {
out->dictionary = new ArrowSchema();
auto rc = ArrowSchemaDeepCopy(*source.dictionary, out->dictionary, error);
if (rc != 0) {
delete out->dictionary;
out->dictionary = nullptr;
return rc;
}
}
} catch (std::exception &e) {
error = e.what();
return -1;
}
return 0;
}

// Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream.
// This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper
// with an ArrowQueryResult (which throws NotImplementedException from FetchInternal).
Expand All @@ -628,7 +556,16 @@ struct ArrowQueryResultStreamWrapper {
arrays = arrow_result.ConsumeArrays();

cached_schema.release = nullptr;
ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties);
if (arrow_result.HasCachedSchema()) {
// Reuse the schema the collector built under the producing transaction.
// Rebuilding it here (post-commit) would assert for arrow type
// extensions that do a catalog lookup, e.g. GeoArrow CRS -- this is the
// capsule / Arrow C Stream form of #475 (pa.table(rel), pl.DataFrame(rel),
// ADBC). See duckdb-python#475 / duckdb-spatial#788.
arrow_result.GetSchema(cached_schema);
} else {
ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties);
}

stream.private_data = this;
stream.get_schema = GetSchema;
Expand All @@ -648,7 +585,11 @@ struct ArrowQueryResultStreamWrapper {
return -1;
}
auto self = reinterpret_cast<ArrowQueryResultStreamWrapper *>(stream->private_data);
return ArrowSchemaDeepCopy(self->cached_schema, out, self->last_error);
auto rc = duckdb_nanoarrow::ArrowSchemaDeepCopy(&self->cached_schema, out);
if (rc != NANOARROW_OK) {
self->last_error = "failed to copy cached Arrow schema";
}
return rc;
}

static int GetNext(ArrowArrayStream *stream, ArrowArray *out) {
Expand Down Expand Up @@ -731,7 +672,11 @@ struct SchemaCachingStreamWrapper {
if (!self->schema_ok) {
return -1;
}
return ArrowSchemaDeepCopy(self->cached_schema, out, self->schema_error);
auto rc = duckdb_nanoarrow::ArrowSchemaDeepCopy(&self->cached_schema, out);
if (rc != NANOARROW_OK) {
self->schema_error = "failed to copy cached Arrow schema";
}
return rc;
}

static int GetNext(ArrowArrayStream *stream, ArrowArray *out) {
Expand Down
63 changes: 63 additions & 0 deletions tests/fast/arrow/test_475_geometry_crs_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Regression test for duckdb-python#475 / duckdb-spatial#788.

Converting a GEOMETRY column that carries a CRS to Arrow used to raise
``InternalException: TransactionContext::ActiveTransaction called without
active transaction``. The GeoArrow schema callback does a catalog lookup to
resolve the CRS, which needs an active transaction -- but the Arrow schema was
rebuilt at consume time, after the producing (auto-commit) transaction had
already closed.

The fix builds and caches the schema on ArrowQueryResult while the producing
transaction is still active, and the consumers below reuse it. Each test
exercises one of those consumers. The geometry-with-CRS value is built with a
pure-core cast, so no spatial extension is required.
"""

from __future__ import annotations

import pytest

import duckdb

pa = pytest.importorskip("pyarrow")

# An authority-code CRS forces the catalog lookup that used to require an open
# transaction. No spatial extension needed -- the cast and geoarrow.wkb mapping
# are both in core.
GEOM_SQL = "SELECT 'POINT(0 1)'::GEOMETRY('OGC:CRS84') AS g"


def _assert_geoarrow_with_crs(field: pa.Field) -> None:
metadata = field.metadata or {}
assert metadata.get(b"ARROW:extension:name") == b"geoarrow.wkb"
assert b"crs" in metadata.get(b"ARROW:extension:metadata", b"")


def test_475_to_arrow_table_geometry_with_crs():
con = duckdb.connect()
table = con.sql(GEOM_SQL).to_arrow_table()
assert table.num_rows == 1
_assert_geoarrow_with_crs(table.schema.field("g"))


def test_475_arrow_capsule_geometry_with_crs():
# pa.table(rel) consumes via __arrow_c_stream__ (the capsule / ADBC path).
con = duckdb.connect()
table = pa.table(con.sql(GEOM_SQL))
assert table.num_rows == 1
_assert_geoarrow_with_crs(table.schema.field("g"))


def test_475_record_batch_reader_geometry_with_crs():
con = duckdb.connect()
table = con.sql(GEOM_SQL).to_arrow_reader().read_all()
assert table.num_rows == 1
_assert_geoarrow_with_crs(table.schema.field("g"))


def test_475_polars_geometry_with_crs():
pl = pytest.importorskip("polars")
con = duckdb.connect()
# polars.DataFrame(rel) pulls the relation's Arrow C stream directly.
df = pl.DataFrame(con.sql(GEOM_SQL))
assert df.height == 1
Loading