Streaming
One additional benefit of the lazy API is that it allows queries to be executed in a streaming manner. Instead of processing all the data at once, Polars can execute the query in batches allowing you to process datasets that do not fit in memory.
To tell Polars we want to execute a query in streaming mode we pass the streaming=True
argument to
collect
q1 = (
pl.scan_csv("docs/assets/data/iris.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
df = q1.collect(streaming=True)
collect
· Available on feature streaming
let q1 = LazyCsvReader::new("docs/assets/data/iris.csv")
.with_has_header(true)
.finish()?
.filter(col("sepal_length").gt(lit(5)))
.group_by(vec![col("species")])
.agg([col("sepal_width").mean()]);
let df = q1.clone().with_streaming(true).collect()?;
println!("{}", df);
When is streaming available?
Streaming is still in development. We can ask Polars to execute any lazy query in streaming mode. However, not all lazy operations support streaming. If there is an operation for which streaming is not supported, Polars will run the query in non-streaming mode.
Streaming is supported for many operations including:
filter
,slice
,head
,tail
with_columns
,select
group_by
join
unique
sort
explode
,unpivot
scan_csv
,scan_parquet
,scan_ipc
This list is not exhaustive. Polars is in active development, and more operations can be added without explicit notice.
Example with supported operations
To determine which parts of your query are streaming, use the explain
method. Below is an example
that demonstrates how to inspect the query plan. More information about the query plan can be found
in the chapter on the Lazy API.
STREAMING:
AGGREGATE
[col("sepal_width").mean()] BY [col("species")] FROM
simple π 3/3 ["sepal_width", "species", ... 1 other column]
Csv SCAN [docs/assets/data/iris.csv]
PROJECT 3/5 COLUMNS
SELECTION: [(col("sepal_length")) > (5.0)]
Example with non-streaming operations
q2 = pl.scan_csv("docs/assets/data/iris.csv").with_columns(
pl.col("sepal_length").mean().over("species")
)
print(q2.explain(streaming=True))
let q2 = LazyCsvReader::new("docs/assets/data/iris.csv")
.finish()?
.with_columns(vec![col("sepal_length")
.mean()
.over(vec![col("species")])
.alias("sepal_length_mean")]);
let query_plan = q2.with_streaming(true).explain(true)?;
println!("{}", query_plan);
WITH_COLUMNS:
[col("sepal_length").mean().over([col("species")])]
STREAMING:
Csv SCAN [docs/assets/data/iris.csv]
PROJECT */5 COLUMNS