Airflow
Execute Polars Cloud queries remotely using Airflow workflows through secure credential management. This section explains how to configure Airflow to submit and monitor Polars Cloud workloads using Airflow's built-in security mechanisms, keeping service account credentials isolated from DAG code while maintaining full workflow control.
- Secret manager
(docs):
is the Airflow-recommended way to handle secrets. It involves setting up a
Secret Backend
(many providers maintained by the community) in theairflow.cfg
and let Airflow workers pull the given secrets via theairflow.models.Variable
API asVariable.get("<SECRET NAME>")
. Note Airflow will pull the secret in its own metastore; if this situation is not desirable, interacting with the cloud provider's Secret Manager (or any other vault accessible via API) can simply be performed as a task of your DAG; see relevant official docs (here is AWS' as an example). - Environment variables
(docs):
load your environment variables into your containers after prefixing them by
AIRFLOW_VAR_
, for instanceAIRFLOW_VAR_POLARS_CLOUD_CLIENT_ID
andAIRFLOW_VAR_POLARS_CLOUD_CLIENT_SECRET
. They should then be available through theairflow.models.Variable
API asVariable.get("POLARS_CLOUD_CLIENT_ID")
. - Airflow
Variables
(docs): in the Airflow UI > Admin > Variables tab one can add/edit key: value pairs provided to Airflow, which will make them accessible through theairflow.models.Variable
API. Note these objects can also be defined using the Airflow CLI (if accessible):airflow variables set POLARS_CLOUD_CLIENT_ID "<SECRET>"
.
Some code snippets for solutions #1 and #2 described above:
# pull secrets from the aws secret manager
@resource
def service_account_from_aws(_):
client = boto3.client("secretsmanager")
Variable.set(
"client_id",
client.get_secret_value(SecretId="<SECRET NAME>").get("SecretString"),
)
Variable.set(
"client_secret",
client.get_secret_value(SecretId="<SECRET NAME>").get("SecretString"),
)
# fetch [securely injected!] secrets from environment
@resource
def service_account_from_env(_):
Variable.set("client_id", os.getenv("POLARS_CLOUD_CLIENT_ID"))
Variable.set("client_secret", os.getenv("POLARS_CLOUD_CLIENT_SECRET"))
Below a few lines of pseudo-code using Airflow' TaskFlow
API:
import polars as pl
from airflow.models import Variable
from airflow.sdk import dag, task
from polars_cloud import ComputeContext, authenticate, set_compute_context
# define two compute contexts (two instance sizes)
vm_small = ComputeContext(cpus=2, memory=4)
vm_large = ComputeContext(cpus=4, memory=16)
# queries will execute on the small vm by default
set_compute_context(vm_small)
@dag(...)
def taskflow():
@task()
def prepare_dataset_1():
pl.scan_csv(...).remote().sink_parquet(...)
@task()
def prepare_dataset_2():
pl.scan_ndjson(...).remote().sink_parquet(...)
# use a bigger machine for this operation
@task()
@set_compute_context(vm_large)
def join_datasets():
pl.scan_parquet(...).remote().sink_parquet(...)
# authenticate to polars cloud with the secrets created above
authenticate(
client_id=Variable.get("secret_id"),
client_secret=Variable.get("secret_secret"),
)
prepare_dataset_1()
prepare_dataset_2()
join_datasets()
taskflow()
# stop the instances
vm_small.stop()
vm_large.stop()