Skip to content

Distributed query execution

With the introduction of Polars Cloud, we also introduced the distributed engine. This engine enables users to horizontally scale workloads across multiple machines.

Polars has always been optimized for fast and efficient performance on a single machine. However, when querying large datasets from cloud storage, performance is often constrained by the I/O limitations of a single node. By scaling horizontally, these download limitations can be significantly reduced, allowing users to process at scale.

Distributed engine is in early stage

The distributed engine is in its very early development. It currently runs all PDS-H benchmarks. Major performance improvements will be introduced in the near future. When a operation is not available in a distributed manner, Polars Cloud will run that operation on single node.

Using distributed engine

To execute queries using the distributed engine, you can call distributed().

lf: LazyFrame

result = (
      lf.remote()
      .distributed()
      .collect()
)

Example

import polars as pl
import polars_cloud as pc
from datetime import date

query = (
    pl.scan_parquet("s3://dataset/")
    .filter(pl.col("l_shipdate") <= date(1998, 9, 2))
    .group_by("l_returnflag", "l_linestatus")
    .agg(
        avg_price=pl.mean("l_extendedprice"),
        avg_disc=pl.mean("l_discount"),
        count_order=pl.len(),
    )
)

result = (
    query.remote(pc.ComputeContext(cpus=16, memory=64, cluster_size=32))
    .distributed()
    .sink_parquet("s3://output/result.parquet")
)