Skip to content

Prefect

Integrate Polars Cloud authentication and per-task compute sizing into a Prefect flow using Secret blocks for credentials.

Credentials are stored as Prefect Secret blocks so raw values are never hard-coded in flow code. Register and populate them once with the Prefect CLI:

prefect block register -m prefect.blocks.system
prefect block create secret polars-cloud-client-id
prefect block create secret polars-cloud-client-secret

Authentication is performed once at the top of the flow so every task in the run shares the same session. Each task receives its ComputeContext directly via .remote(ctx), and the flow owns the VM lifecycle: start() is called before any task runs and stop() is guaranteed by the finally clause.

dataset_1 and dataset_2 share a single instance which handles both tasks sequentially (concurrently for Prefect, but sequentially in Polars Cloud). joined runs on a larger VM, which is started and stopped independently.

import polars as pl
import polars_cloud as pc
from prefect import flow, task
from prefect.blocks.system import Secret

SMALL_CTX = pc.ComputeContext(cpus=2, memory=4)
LARGE_CTX = pc.ComputeContext(cpus=4, memory=16)


@task
def dataset_1():
    pl.scan_csv(...).remote(SMALL_CTX).sink_parquet(...)


@task
def dataset_2():
    pl.scan_ndjson(...).remote(SMALL_CTX).sink_parquet(...)


# Use a bigger machine for the join.
@task
def joined():
    pl.scan_parquet(...).remote(LARGE_CTX).sink_parquet(...)


@flow(name="Report")
def report():
    pc.authenticate(
        client_id=Secret.load("polars-cloud-client-id").get(),
        client_secret=Secret.load("polars-cloud-client-secret").get(),
    )

    SMALL_CTX.start()
    LARGE_CTX.start()
    try:
        f1 = dataset_1.submit()
        f2 = dataset_2.submit()
        joined.submit(wait_for=[f1, f2])
    finally:
        SMALL_CTX.stop()
        LARGE_CTX.stop()


report()