polars.LazyFrame.sink_delta#
- LazyFrame.sink_delta(
- target: str | Path | deltalake.DeltaTable,
- *,
- mode: Literal['error', 'append', 'overwrite', 'ignore', 'merge'] = 'error',
- storage_options: StorageOptionsDict | None = None,
- credential_provider: CredentialProviderFunction | Literal['auto'] | None = 'auto',
- delta_write_options: dict[str, Any] | None = None,
- delta_merge_options: dict[str, Any] | None = None,
- optimizations: QueryOptFlags = (),
Sink DataFrame as delta table.
Warning
This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
- Parameters:
- target
URI of a table or a DeltaTable object.
- mode{‘error’, ‘append’, ‘overwrite’, ‘ignore’, ‘merge’}
How to handle existing data.
If ‘error’, throw an error if the table already exists (default).
If ‘append’, will add new data.
If ‘overwrite’, will replace table with new data.
If ‘ignore’, will not write anything if table already exists.
If ‘merge’, return a
TableMergerobject to merge data from the DataFrame with the existing data.
- storage_options
Extra options for the storage backends supported by
deltalake. For cloud storages, this may include configurations for authentication etc.- credential_provider
Provide a function that can be called to provide cloud storage credentials. The function is expected to return a dictionary of credential keys along with an optional credential expiry time.
Warning
This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
- delta_write_options
Additional keyword arguments while writing a Delta lake Table. See a list of supported write options here.
- delta_merge_options
Keyword arguments which are required to
MERGEa Delta lake Table. See a list of supported merge options here.- engine
Select the engine used to process the query, optional. At the moment, if set to
"auto"(default), the query is run using the polars streaming engine. Polars will also attempt to use the engine set by thePOLARS_ENGINE_AFFINITYenvironment variable. If it cannot run the query using the selected engine, the query is run using the polars streaming engine.- optimizations
The optimization passes done during query optimization.
Warning
This functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
- Raises:
- TypeError
If the DataFrame contains unsupported data types.
- ArrowInvalidError
If the DataFrame contains data types that could not be cast to their primitive type.
- TableNotFoundError
If the delta table doesn’t exist and MERGE action is triggered
Notes
The Polars data types
NullandTimeare not supported by the delta protocol specification and will raise a TypeError. Columns using TheCategoricaldata type will be converted to normal (non-categorical) strings when written.Polars columns are always nullable. To write data to a delta table with non-nullable columns, a custom pyarrow schema has to be passed to the
delta_write_options. See the last example below.Examples
Sink a large than fits into memory dataset to a Delta Lake table.
>>> lf = pl.scan_parquet( ... "/path/to/my_larger_than_ram_file.parquet" ... ) >>> table_path = "/path/to/delta-table/" >>> lf.sink_delta(table_path)
Sink a dataframe to the local filesystem as a Delta Lake table.
>>> df = pl.DataFrame( ... { ... "foo": [1, 2, 3, 4, 5], ... "bar": [6, 7, 8, 9, 10], ... "ham": ["a", "b", "c", "d", "e"], ... } ... ) >>> table_path = "/path/to/delta-table/" >>> df.lazy().sink_delta(table_path)
Append data to an existing Delta Lake table on the local filesystem. Note that this will fail if the schema of the new data does not match the schema of the existing table.
>>> df.lazy().sink_delta(table_path, mode="append")
Overwrite a Delta Lake table as a new version. If the schemas of the new and old data are the same, specifying the
schema_modeis not required.>>> existing_table_path = "/path/to/delta-table/" >>> df.lazy().sink_delta( ... existing_table_path, ... mode="overwrite", ... delta_write_options={"schema_mode": "overwrite"}, ... )
Sink a DataFrame as a Delta Lake table to a cloud object store like S3.
>>> table_path = "s3://bucket/prefix/to/delta-table/" >>> df.lazy().sink_delta( ... table_path, ... storage_options={ ... "AWS_REGION": "THE_AWS_REGION", ... "AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", ... "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY", ... }, ... )
Sink DataFrame as a Delta Lake table with non-nullable columns.
>>> import pyarrow as pa >>> existing_table_path = "/path/to/delta-table/" >>> df.lazy().sink_delta( ... existing_table_path, ... delta_write_options={ ... "schema": pa.schema([pa.field("foo", pa.int64(), nullable=False)]) ... }, ... )
Sink DataFrame as a Delta Lake table with zstd compression. For all
delta_write_optionskeyword arguments, check the deltalake docs here, and for Writer Properties in particular here.>>> import deltalake >>> df.lazy().sink_delta( ... table_path, ... delta_write_options={ ... "writer_properties": deltalake.WriterProperties(compression="zstd"), ... }, ... )
Merge the DataFrame with an existing Delta Lake table. For all
TableMergermethods, check the deltalake docs here.>>> df = pl.DataFrame( ... { ... "foo": [1, 2, 3, 4, 5], ... "bar": [6, 7, 8, 9, 10], ... "ham": ["a", "b", "c", "d", "e"], ... } ... ) >>> table_path = "/path/to/delta-table/" >>> ( ... df.lazy() ... .sink_delta( ... "table_path", ... mode="merge", ... delta_merge_options={ ... "predicate": "s.foo = t.foo", ... "source_alias": "s", ... "target_alias": "t", ... }, ... ) ... .when_matched_update_all() ... .when_not_matched_insert_all() ... .execute() ... )