Streaming API
One additional benefit of the lazy API is that it allows queries to be executed in a streaming manner. Instead of processing the data all-at-once Polars can execute the query in batches allowing you to process datasets that are larger-than-memory.
To tell Polars we want to execute a query in streaming mode we pass the streaming=True
argument to collect
q1 = (
pl.scan_csv("docs/data/iris.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
df = q1.collect(streaming=True)
collect
ยท Available on feature streaming
let q1 = LazyCsvReader::new("docs/data/iris.csv")
.with_has_header(true)
.finish()?
.filter(col("sepal_length").gt(lit(5)))
.group_by(vec![col("species")])
.agg([col("sepal_width").mean()]);
let df = q1.clone().with_streaming(true).collect()?;
println!("{}", df);
When is streaming available?
Streaming is still in development. We can ask Polars to execute any lazy query in streaming mode. However, not all lazy operations support streaming. If there is an operation for which streaming is not supported Polars will run the query in non-streaming mode.
Streaming is supported for many operations including:
filter
,slice
,head
,tail
with_columns
,select
group_by
join
unique
sort
explode
,unpivot
scan_csv
,scan_parquet
,scan_ipc
This list is not exhaustive. Polars is in active development, and more operations can be added without explicit notice.
Example with supported operations
To determine which parts of your query are streaming, use the explain
method. Below is an example that demonstrates how to inspect the query plan. More information about the query plan can be found in the chapter on the Lazy API.
STREAMING:
AGGREGATE
[col("sepal_width").mean()] BY [col("species")] FROM
Csv SCAN [docs/data/iris.csv]
PROJECT 3/5 COLUMNS
SELECTION: [(col("sepal_length")) > (5.0)]
Example with non-streaming operations
q2 = pl.scan_csv("docs/data/iris.csv").with_columns(
pl.col("sepal_length").mean().over("species")
)
print(q2.explain(streaming=True))
let q2 = LazyCsvReader::new("docs/data/iris.csv")
.finish()?
.with_columns(vec![col("sepal_length")
.mean()
.over(vec![col("species")])
.alias("sepal_length_mean")]);
let query_plan = q2.with_streaming(true).explain(true)?;
println!("{}", query_plan);
WITH_COLUMNS:
[col("sepal_length").mean().over([col("species")])]
STREAMING:
Csv SCAN [docs/data/iris.csv]
PROJECT */5 COLUMNS