Cloud storage
Polars can read and write to AWS S3, Azure Blob Storage and Google Cloud Storage. The API is the same for all three storage providers.
To read from cloud storage, additional dependencies may be needed depending on the use case and cloud storage provider:
$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full
Reading from cloud storage
Polars supports reading Parquet, CSV, IPC and NDJSON files from cloud storage:
read_parquet
· read_csv
· read_ipc
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.read_parquet(source)
ParquetReader
· CsvReader
· IpcReader
· Available on feature csv · Available on feature ipc · Available on feature parquet
use aws_config::BehaviorVersion;
use polars::prelude::*;
#[tokio::main]
async fn main() {
let bucket = "<YOUR_BUCKET>";
let path = "<YOUR_PATH>";
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = aws_sdk_s3::Client::new(&config);
let object = client
.get_object()
.bucket(bucket)
.key(path)
.send()
.await
.unwrap();
let bytes = object.body.collect().await.unwrap().into_bytes();
let cursor = std::io::Cursor::new(bytes);
let df = CsvReader::new(cursor).finish().unwrap();
println!("{:?}", df);
}
Scanning from cloud storage with query optimisation
Using pl.scan_*
functions to read from cloud storage can benefit from
predicate and projection pushdowns, where the query optimizer will apply
them before the file is downloaded. This can significantly reduce the amount of data that needs to
be downloaded. The query evaluation is triggered by calling collect
.
import polars as pl
source = "s3://bucket/*.parquet"
df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()
Cloud authentication
Polars is able to automatically load default credential configurations for some cloud providers. For cases when this does not happen, it is possible to manually configure the credentials for Polars to use for authentication. This can be done in a few ways:
Using storage_options
:
- Credentials can be passed as configuration keys in a dict with the
storage_options
parameter:
import polars as pl
source = "s3://bucket/*.parquet"
storage_options = {
"aws_access_key_id": "<secret>",
"aws_secret_access_key": "<secret>",
"aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options).collect()
Using one of the available CredentialProvider*
utility classes
- There may be a utility class
pl.CredentialProvider*
that provides the required authentication functionality. For example,pl.CredentialProviderAWS
supports selecting AWS profiles, as well as assuming an IAM role:
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=pl.CredentialProviderAWS(
profile_name="..."
assume_role={
"RoleArn": f"...",
"RoleSessionName": "...",
}
),
)
df = lf.collect()
Using a custom credential_provider
function
- Some environments may require custom authentication logic (e.g. AWS IAM role-chaining). For these cases a Python function can be provided for Polars to use to retrieve credentials:
def get_credentials() -> pl.CredentialProviderFunctionReturn:
expiry = None
return {
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"aws_session_token": "...",
}, expiry
lf = pl.scan_parquet(
"s3://.../...",
credential_provider=get_credentials,
)
df = lf.collect()
Scanning with PyArrow
We can also scan from cloud storage using PyArrow. This is particularly useful for partitioned datasets such as Hive partitioning.
We first create a PyArrow dataset and then create a LazyFrame
from the dataset.
import polars as pl
import pyarrow.dataset as ds
dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
pl.scan_pyarrow_dataset(dset)
.filter(pl.col("foo") == "a")
.select(["foo", "bar"])
.collect()
)
Writing to cloud storage
We can write a DataFrame
to cloud storage in Python using s3fs for S3, adlfs for Azure Blob
Storage and gcsfs for Google Cloud Storage. In this example, we write a Parquet file to S3.
import polars as pl
import s3fs
df = pl.DataFrame({
"foo": ["a", "b", "c", "d", "d"],
"bar": [1, 2, 3, 4, 5],
})
fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"
# write parquet
with fs.open(destination, mode='wb') as f:
df.write_parquet(f)