Skip to content

Prefect

Configure Polars Cloud authentication securely within Prefect workflows using native secret management patterns. This section details how to integrate Polars Cloud service account credentials with Prefect's configuration system.

Prefect implements secure credential handling through three standard approaches:

  1. Secret manager (recommended): pull the secret secret manager of your choice and use it in your workflow (see official docs; here is AWS' as an example). One can also use the AWS-specific Secret Block (see below; docs) to interact with the AWS Secret Manager.
  2. Environment variables: load your environment variables into your running instance (container or else).
  3. Block system (docs): Prefect defined a Block framework that can be used via the CLI (prefect block register -m prefect.blocks.system) or directly in the code (from prefect.blocks.system import Secret). A secret can be created via CLI (for instance): prefect block create secret polars-cloud-client-id and retrieved from the code as Secret.load("polars-cloud-client-id").get().

Some code snippets for solutions #1 and #2 described above:

# pull secrets from the aws secret manager
def service_account_from_aws(_):
    client = boto3.client("secretsmanager")
    return {
        "client_id": client.get_secret_value(SecretId="<SECRET>").get("SecretString"),
        "client_secret": client.get_secret_value(SecretId="<SECRET>").get("SecretString"),
    }
# fetch [securely injected!] secrets from environment
@resource
def service_account_from_env(_):
    return {
        "client_id": os.getenv("POLARS_CLOUD_CLIENT_ID"),
        "client_secret": os.getenv("POLARS_CLOUD_CLIENT_SECRET"),
    }

Below a few lines of pseudo-code to define a Prefect flow:

import os
import polars as pl

from polars_cloud import ComputeContext, authenticate, set_compute_context
from prefect import flow, task

# define two compute contexts (two instance sizes)
vm_small = ComputeContext(cpus=2, memory=4)
vm_large = ComputeContext(cpus=4, memory=16)

# queries will execute on the small vm by default
set_compute_context(vm_small)

@task
def prepare_dataset_1():
    pl.scan_csv(...).remote().sink_parquet(...)

@task
def prepare_dataset_2():
    pl.scan_ndjson(...).remote().sink_parquet(...)

# use a bigger machine for this operation
@task
@set_compute_context(vm_large)
def join_datasets():
    pl.scan_parquet(...).remote().sink_parquet(...)

@flow(name="Daily report")
def report():
    # authenticate to polars cloud with the secrets created above
    authenticate(**service_account_from_env())

    prepare_dataset_1()
    prepare_dataset_2()
    join_datasets()

# run the flow
report()

# stop the instances
vm_small.stop()
vm_large.stop()