Skip to content

Dagster

Integrate Polars Cloud authentication and per-asset compute sizing into a Dagster pipeline using typed ConfigurableResource subclasses.

Credentials are referenced through EnvVar so the raw values are resolved at run launch and never displayed in the Dagster UI. Populate POLARS_CLOUD_CLIENT_ID and POLARS_CLOUD_CLIENT_SECRET in the Dagster deployment environment — either directly or by exporting from a secret manager (e.g. AWS Secrets Manager).

PolarsCloudCompute declares PolarsCloudAuth as a nested resource so Dagster runs PolarsCloudAuth.setup_for_execution — which calls authenticate(...) — before any compute resource is used. The compute resource owns the ComputeContext lifecycle through yield_for_execution: the VM is started before each asset that uses the resource runs and stopped on exit, including on failure. Asset bodies contain only Polars code.

Per-asset compute sizing falls out of the same pattern: declare one PolarsCloudCompute instance per VM shape under distinct resource keys, and each asset takes the instance sized for it. Dagster manages the lifecycle of each instance independently, so the asset that uses the larger VM does not pay the cost of spinning it up while the smaller-VM assets are running.

from contextlib import contextmanager

import polars as pl
import polars_cloud as pc
from dagster import asset, ConfigurableResource, Definitions, EnvVar


class PolarsCloudAuth(ConfigurableResource):
    """Service account authentication for Polars Cloud."""

    client_id: str
    client_secret: str

    def setup_for_execution(self, context) -> None:
        pc.authenticate(client_id=self.client_id, client_secret=self.client_secret)


class PolarsCloudCompute(ConfigurableResource):
    """Polars Cloud compute context with auto-managed VM lifecycle.

    Declares `PolarsCloudAuth` as a nested resource so Dagster runs `authenticate(...)`
    before this resource is used. `yield_for_execution(...)` starts a `ComputeContext`
    of the configured shape and the `finally` clause guarantees the VM is released even
    when an asset raises.
    """

    auth: PolarsCloudAuth
    cpus: int
    memory: int

    @contextmanager
    def yield_for_execution(self, context):
        ctx = pc.ComputeContext(cpus=self.cpus, memory=self.memory)
        ctx.start()
        try:
            pc.set_compute_context(ctx)
            yield self
        finally:
            ctx.stop()


@asset
def dataset_1(small_vm: PolarsCloudCompute):
    pl.scan_csv(...).remote().sink_parquet(...)


@asset
def dataset_2(small_vm: PolarsCloudCompute):
    pl.scan_ndjson(...).remote().sink_parquet(...)


# Use a bigger machine for the join.
@asset(deps=[dataset_1, dataset_2])
def joined(large_vm: PolarsCloudCompute):
    pl.scan_parquet(...).remote().sink_parquet(...)


# Share a single `PolarsCloudAuth` instance between the compute resources so the
# credentials are loaded once per run.
auth = PolarsCloudAuth(
    client_id=EnvVar("POLARS_CLOUD_CLIENT_ID"),
    client_secret=EnvVar("POLARS_CLOUD_CLIENT_SECRET"),
)

defs = Definitions(
    assets=[dataset_1, dataset_2, joined],
    resources={
        "small_vm": PolarsCloudCompute(auth=auth, cpus=2, memory=4),
        "large_vm": PolarsCloudCompute(auth=auth, cpus=4, memory=16),
    },
)

The Dagster resource key (the dictionary key in Definitions.resources) must match the parameter name in each asset signature. The type annotation is for editor and type-checker support; Dagster injects by key, not by type.