Skip to content

Distributed queries

With the introduction of Polars Cloud, we also introduced the distributed engine. This engine enables users to horizontally scale workloads across multiple machines.

Polars has always been optimized for fast and efficient performance on a single machine. However, when querying large datasets from cloud storage, performance is often constrained by the I/O limitations of a single node. By scaling horizontally, these download limitations can be significantly reduced, allowing users to process data at scale.

Distributed engine is early stage

The distributed engine is in alpha and some operations are not supported yet.

Find out which operations are currently supported in the distributed engine.

Using distributed engine

To execute queries using the distributed engine, you can call the distributed() method.

lf: LazyFrame

result = (
      lf.remote()
      .distributed()
      .collect()
)

Example

import polars as pl
import polars_cloud as pc
from datetime import date

query = (
    pl.scan_parquet("s3://dataset/")
    .filter(pl.col("l_shipdate") <= date(1998, 9, 2))
    .group_by("l_returnflag", "l_linestatus")
    .agg(
        avg_price=pl.mean("l_extendedprice"),
        avg_disc=pl.mean("l_discount"),
        count_order=pl.len(),
    )
)

result = (
    query.remote(pc.ComputeContext(cpus=16, memory=64, cluster_size=32))
    .distributed()
    .sink_parquet("s3://output/result.parquet")
)

Working with large datasets in the distributed engine

The distributed engine can only read sources partitioned with direct scan_ methods such as scan_parquet and scan_csv. Open table formats like scan_iceberg are not yet supported in a distributed fashion and will run on a single node when utilized.