diff --git a/_duckdb-stubs/__init__.pyi b/_duckdb-stubs/__init__.pyi index 3f2b35b2..a511c538 100644 --- a/_duckdb-stubs/__init__.pyi +++ b/_duckdb-stubs/__init__.pyi @@ -825,6 +825,8 @@ class DuckDBPyRelation: def type(self) -> str: ... @property def types(self) -> lst[sqltypes.DuckDBPyType]: ... + @property + def _has_relation(self) -> bool: ... class Error(Exception): ... class FatalException(DatabaseError): ... diff --git a/duckdb/polars_io.py b/duckdb/polars_io.py index b5bb6a4e..aca3299c 100644 --- a/duckdb/polars_io.py +++ b/duckdb/polars_io.py @@ -278,6 +278,10 @@ def _pl_tree_to_sql(tree: _ExpressionTree) -> str: def duckdb_source(relation: duckdb.DuckDBPyRelation, schema: pl.schema.Schema) -> pl.LazyFrame: """A polars IO plugin for DuckDB.""" + # Result-backed relations (from `con.execute(...)`) are one-shot iterators — + # they cannot replay project/filter/limit, so we stream raw batches and let + # polars apply the pushdown args itself. Memory stays bounded by batch_size. + can_pushdown = relation._has_relation def source_generator( with_columns: list[str] | None, @@ -285,6 +289,10 @@ def source_generator( n_rows: int | None, batch_size: int | None, ) -> Iterator[pl.DataFrame]: + if not can_pushdown: + yield from _streaming_source_generator(relation, with_columns, predicate, n_rows, batch_size) + return + duck_predicate = None relation_final = relation if with_columns is not None: @@ -309,3 +317,32 @@ def source_generator( yield pl.from_arrow(record_batch) # type: ignore[misc,unused-ignore] return register_io_source(source_generator, schema=schema) + + +def _streaming_source_generator( + relation: duckdb.DuckDBPyRelation, + with_columns: list[str] | None, + predicate: pl.Expr | None, + n_rows: int | None, + batch_size: int | None, +) -> Iterator[pl.DataFrame]: + """Streaming generator for one-shot result-backed relations. + + Applies polars-side projection/filter/limit per batch so memory stays bounded + by `batch_size`. No DuckDB-side pushdown — the underlying result is a single + Arrow C stream that cannot be filtered or projected after the fact. + """ + reader = relation.to_arrow_reader() if batch_size is None else relation.to_arrow_reader(batch_size) + rows_left = n_rows + for record_batch in iter(reader.read_next_batch, None): + df: pl.DataFrame = pl.from_arrow(record_batch) # type: ignore[assignment,misc,unused-ignore] + if with_columns is not None: + df = df.select(with_columns) + if predicate is not None: + df = df.filter(predicate) + if rows_left is not None: + if df.height >= rows_left: + yield df.head(rows_left) + return + rows_left -= df.height + yield df diff --git a/src/duckdb_py/include/duckdb_python/pyrelation.hpp b/src/duckdb_py/include/duckdb_python/pyrelation.hpp index 50f39b5f..c4ebdfc8 100644 --- a/src/duckdb_py/include/duckdb_python/pyrelation.hpp +++ b/src/duckdb_py/include/duckdb_python/pyrelation.hpp @@ -254,6 +254,10 @@ struct DuckDBPyRelation { static bool IsRelation(const py::object &object); + //! True if this object wraps a replayable Relation (supports project/filter/limit). + //! False for result-backed relations produced by `con.execute(...)`, which are one-shot. + bool HasRelation() const; + bool CanBeRegisteredBy(Connection &con); bool CanBeRegisteredBy(ClientContext &context); bool CanBeRegisteredBy(shared_ptr &context); diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index f15bbc16..ee743fec 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -1044,7 +1044,9 @@ PolarsDataFrame DuckDBPyRelation::ToPolars(idx_t batch_size, bool lazy) { duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_size) { if (!result) { if (!rel) { - return py::none(); + throw InvalidInputException( + "This result-backed relation has already been consumed and cannot be read again. " + "Result-backed relations (from `con.execute(...)`) are one-shot."); } ExecuteOrThrow(true); } @@ -1073,6 +1075,10 @@ bool DuckDBPyRelation::ContainsColumnByName(const string &name) const { [&](const string &item) { return StringUtil::CIEquals(name, item); }) != names.end(); } +bool DuckDBPyRelation::HasRelation() const { + return rel != nullptr; +} + void DuckDBPyRelation::SetConnectionOwner(py::object owner) { connection_owner = std::move(owner); } diff --git a/src/duckdb_py/pyrelation/initialize.cpp b/src/duckdb_py/pyrelation/initialize.cpp index 4393889a..441dd680 100644 --- a/src/duckdb_py/pyrelation/initialize.cpp +++ b/src/duckdb_py/pyrelation/initialize.cpp @@ -23,7 +23,11 @@ static void InitializeReadOnlyProperties(py::class_ &m) { .def_property_readonly("description", &DuckDBPyRelation::Description, "Return the description of the result") .def_property_readonly("alias", &DuckDBPyRelation::GetAlias, "Get the name of the current alias") .def("__len__", &DuckDBPyRelation::Length, "Number of rows in relation.") - .def_property_readonly("shape", &DuckDBPyRelation::Shape, " Tuple of # of rows, # of columns in relation."); + .def_property_readonly("shape", &DuckDBPyRelation::Shape, " Tuple of # of rows, # of columns in relation.") + .def_property_readonly("_has_relation", &DuckDBPyRelation::HasRelation, + "Internal: True if this object wraps a replayable Relation (supports " + "project/filter/limit). False for one-shot result-backed relations from " + "con.execute(...). Used by the polars IO plugin."); } static void InitializeConsumers(py::class_ &m) { diff --git a/tests/fast/arrow/test_polars.py b/tests/fast/arrow/test_polars.py index f3d7e072..a1841c16 100644 --- a/tests/fast/arrow/test_polars.py +++ b/tests/fast/arrow/test_polars.py @@ -123,6 +123,43 @@ def test_polars_lazy_from_conn(self, duckdb_cursor): lazy_df = result.pl(lazy=True) assert lazy_df.collect().to_dicts() == [{"bla": 42}] + # Regression: https://github.com/duckdb/duckdb/issues/20094 + # `con.execute(...).pl(lazy=True)` returns a result-backed relation. Any polars + # operation that triggered with_columns / predicate / n_rows pushdown used to + # raise `AttributeError: 'NoneType' object has no attribute 'fetch_arrow_reader'` + # because project/filter/limit return None on result-backed relations. + def test_polars_lazy_from_conn_select_len(self, duckdb_cursor): + con = duckdb.connect() + lf = con.execute("SELECT 1 AS id, 'banana' AS fruit").pl(lazy=True) + assert lf.select(pl.len()).collect().item() == 1 + + def test_polars_lazy_from_conn_select_subset(self, duckdb_cursor): + con = duckdb.connect() + lf = con.execute("SELECT 1 AS id, 'banana' AS fruit").pl(lazy=True) + assert lf.select("id").collect().to_dicts() == [{"id": 1}] + + def test_polars_lazy_from_conn_filter(self, duckdb_cursor): + con = duckdb.connect() + lf = con.execute("SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c')) t(id, name)").pl(lazy=True) + assert lf.filter(pl.col("id") >= 2).collect().to_dicts() == [ + {"id": 2, "name": "b"}, + {"id": 3, "name": "c"}, + ] + + def test_polars_lazy_from_conn_limit(self, duckdb_cursor): + con = duckdb.connect() + lf = con.execute("SELECT * FROM range(100) t(i)").pl(lazy=True) + assert lf.head(3).collect().to_dicts() == [{"i": 0}, {"i": 1}, {"i": 2}] + + def test_polars_lazy_from_conn_consumed_once(self, duckdb_cursor): + # Result-backed relations are one-shot. The second collect should raise a + # clear error instead of silently returning None or AttributeError. + con = duckdb.connect() + lf = con.execute("SELECT 1 AS bla").pl(lazy=True) + assert lf.collect().to_dicts() == [{"bla": 1}] + with pytest.raises((duckdb.InvalidInputException, pl.exceptions.ComputeError)): + lf.collect() + def test_polars_lazy(self, duckdb_cursor): con = duckdb.connect() con.execute("Create table names (a varchar, b integer)")