Skip to content

Streaming

One additional benefit of the lazy API is that it allows queries to be executed in a streaming manner. Instead of processing all the data at once, Polars can execute the query in batches allowing you to process datasets that do not fit in memory.

To tell Polars we want to execute a query in streaming mode we pass the streaming=True argument to collect

collect

q1 = (
    pl.scan_csv("docs/assets/data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)
df = q1.collect(streaming=True)

collect · Available on feature streaming

let q1 = LazyCsvReader::new("docs/assets/data/iris.csv")
    .with_has_header(true)
    .finish()?
    .filter(col("sepal_length").gt(lit(5)))
    .group_by(vec![col("species")])
    .agg([col("sepal_width").mean()]);

let df = q1.clone().with_streaming(true).collect()?;
println!("{}", df);

When is streaming available?

Streaming is still in development. We can ask Polars to execute any lazy query in streaming mode. However, not all lazy operations support streaming. If there is an operation for which streaming is not supported, Polars will run the query in non-streaming mode.

Streaming is supported for many operations including:

  • filter, slice, head, tail
  • with_columns, select
  • group_by
  • join
  • unique
  • sort
  • explode, unpivot
  • scan_csv, scan_parquet, scan_ipc

This list is not exhaustive. Polars is in active development, and more operations can be added without explicit notice.

Example with supported operations

To determine which parts of your query are streaming, use the explain method. Below is an example that demonstrates how to inspect the query plan. More information about the query plan can be found in the chapter on the Lazy API.

explain

print(q1.explain(streaming=True))

explain

let query_plan = q1.with_streaming(true).explain(true)?;
println!("{}", query_plan);

STREAMING:
  AGGREGATE
    [col("sepal_width").mean()] BY [col("species")] FROM
    simple π 3/3 ["sepal_width", "species", ... 1 other column]
      Csv SCAN [docs/assets/data/iris.csv]
      PROJECT 3/5 COLUMNS
      SELECTION: [(col("sepal_length")) > (5.0)]

Example with non-streaming operations

explain

q2 = pl.scan_csv("docs/assets/data/iris.csv").with_columns(
    pl.col("sepal_length").mean().over("species")
)

print(q2.explain(streaming=True))

explain

let q2 = LazyCsvReader::new("docs/assets/data/iris.csv")
    .finish()?
    .with_columns(vec![col("sepal_length")
        .mean()
        .over(vec![col("species")])
        .alias("sepal_length_mean")]);

let query_plan = q2.with_streaming(true).explain(true)?;
println!("{}", query_plan);

 WITH_COLUMNS:
 [col("sepal_length").mean().over([col("species")])] 
  STREAMING:
    Csv SCAN [docs/assets/data/iris.csv]
    PROJECT */5 COLUMNS