Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _duckdb-stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ class DuckDBPyRelation:
def type(self) -> str: ...
@property
def types(self) -> lst[sqltypes.DuckDBPyType]: ...
@property
def _has_relation(self) -> bool: ...

class Error(Exception): ...
class FatalException(DatabaseError): ...
Expand Down
37 changes: 37 additions & 0 deletions duckdb/polars_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,21 @@ def _pl_tree_to_sql(tree: _ExpressionTree) -> str:

def duckdb_source(relation: duckdb.DuckDBPyRelation, schema: pl.schema.Schema) -> pl.LazyFrame:
"""A polars IO plugin for DuckDB."""
# Result-backed relations (from `con.execute(...)`) are one-shot iterators —
# they cannot replay project/filter/limit, so we stream raw batches and let
# polars apply the pushdown args itself. Memory stays bounded by batch_size.
can_pushdown = relation._has_relation

def source_generator(
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
if not can_pushdown:
yield from _streaming_source_generator(relation, with_columns, predicate, n_rows, batch_size)
return

duck_predicate = None
relation_final = relation
if with_columns is not None:
Expand All @@ -309,3 +317,32 @@ def source_generator(
yield pl.from_arrow(record_batch) # type: ignore[misc,unused-ignore]

return register_io_source(source_generator, schema=schema)


def _streaming_source_generator(
relation: duckdb.DuckDBPyRelation,
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
"""Streaming generator for one-shot result-backed relations.

Applies polars-side projection/filter/limit per batch so memory stays bounded
by `batch_size`. No DuckDB-side pushdown — the underlying result is a single
Arrow C stream that cannot be filtered or projected after the fact.
"""
reader = relation.to_arrow_reader() if batch_size is None else relation.to_arrow_reader(batch_size)
rows_left = n_rows
for record_batch in iter(reader.read_next_batch, None):
df: pl.DataFrame = pl.from_arrow(record_batch) # type: ignore[assignment,misc,unused-ignore]
if with_columns is not None:
df = df.select(with_columns)
if predicate is not None:
df = df.filter(predicate)
if rows_left is not None:
if df.height >= rows_left:
yield df.head(rows_left)
return
rows_left -= df.height
yield df
4 changes: 4 additions & 0 deletions src/duckdb_py/include/duckdb_python/pyrelation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ struct DuckDBPyRelation {

static bool IsRelation(const py::object &object);

//! True if this object wraps a replayable Relation (supports project/filter/limit).
//! False for result-backed relations produced by `con.execute(...)`, which are one-shot.
bool HasRelation() const;

bool CanBeRegisteredBy(Connection &con);
bool CanBeRegisteredBy(ClientContext &context);
bool CanBeRegisteredBy(shared_ptr<ClientContext> &context);
Expand Down
8 changes: 7 additions & 1 deletion src/duckdb_py/pyrelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,9 @@ PolarsDataFrame DuckDBPyRelation::ToPolars(idx_t batch_size, bool lazy) {
duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_size) {
if (!result) {
if (!rel) {
return py::none();
throw InvalidInputException(
"This result-backed relation has already been consumed and cannot be read again. "
"Result-backed relations (from `con.execute(...)`) are one-shot.");
}
ExecuteOrThrow(true);
}
Expand Down Expand Up @@ -1073,6 +1075,10 @@ bool DuckDBPyRelation::ContainsColumnByName(const string &name) const {
[&](const string &item) { return StringUtil::CIEquals(name, item); }) != names.end();
}

bool DuckDBPyRelation::HasRelation() const {
return rel != nullptr;
}

void DuckDBPyRelation::SetConnectionOwner(py::object owner) {
connection_owner = std::move(owner);
}
Expand Down
6 changes: 5 additions & 1 deletion src/duckdb_py/pyrelation/initialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ static void InitializeReadOnlyProperties(py::class_<DuckDBPyRelation> &m) {
.def_property_readonly("description", &DuckDBPyRelation::Description, "Return the description of the result")
.def_property_readonly("alias", &DuckDBPyRelation::GetAlias, "Get the name of the current alias")
.def("__len__", &DuckDBPyRelation::Length, "Number of rows in relation.")
.def_property_readonly("shape", &DuckDBPyRelation::Shape, " Tuple of # of rows, # of columns in relation.");
.def_property_readonly("shape", &DuckDBPyRelation::Shape, " Tuple of # of rows, # of columns in relation.")
.def_property_readonly("_has_relation", &DuckDBPyRelation::HasRelation,
"Internal: True if this object wraps a replayable Relation (supports "
"project/filter/limit). False for one-shot result-backed relations from "
"con.execute(...). Used by the polars IO plugin.");
}

static void InitializeConsumers(py::class_<DuckDBPyRelation> &m) {
Expand Down
37 changes: 37 additions & 0 deletions tests/fast/arrow/test_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,43 @@ def test_polars_lazy_from_conn(self, duckdb_cursor):
lazy_df = result.pl(lazy=True)
assert lazy_df.collect().to_dicts() == [{"bla": 42}]

# Regression: https://github.com/duckdb/duckdb/issues/20094
# `con.execute(...).pl(lazy=True)` returns a result-backed relation. Any polars
# operation that triggered with_columns / predicate / n_rows pushdown used to
# raise `AttributeError: 'NoneType' object has no attribute 'fetch_arrow_reader'`
# because project/filter/limit return None on result-backed relations.
def test_polars_lazy_from_conn_select_len(self, duckdb_cursor):
con = duckdb.connect()
lf = con.execute("SELECT 1 AS id, 'banana' AS fruit").pl(lazy=True)
assert lf.select(pl.len()).collect().item() == 1

def test_polars_lazy_from_conn_select_subset(self, duckdb_cursor):
con = duckdb.connect()
lf = con.execute("SELECT 1 AS id, 'banana' AS fruit").pl(lazy=True)
assert lf.select("id").collect().to_dicts() == [{"id": 1}]

def test_polars_lazy_from_conn_filter(self, duckdb_cursor):
con = duckdb.connect()
lf = con.execute("SELECT * FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c')) t(id, name)").pl(lazy=True)
assert lf.filter(pl.col("id") >= 2).collect().to_dicts() == [
{"id": 2, "name": "b"},
{"id": 3, "name": "c"},
]

def test_polars_lazy_from_conn_limit(self, duckdb_cursor):
con = duckdb.connect()
lf = con.execute("SELECT * FROM range(100) t(i)").pl(lazy=True)
assert lf.head(3).collect().to_dicts() == [{"i": 0}, {"i": 1}, {"i": 2}]

def test_polars_lazy_from_conn_consumed_once(self, duckdb_cursor):
# Result-backed relations are one-shot. The second collect should raise a
# clear error instead of silently returning None or AttributeError.
con = duckdb.connect()
lf = con.execute("SELECT 1 AS bla").pl(lazy=True)
assert lf.collect().to_dicts() == [{"bla": 1}]
with pytest.raises((duckdb.InvalidInputException, pl.exceptions.ComputeError)):
lf.collect()

def test_polars_lazy(self, duckdb_cursor):
con = duckdb.connect()
con.execute("Create table names (a varchar, b integer)")
Expand Down