polars_core/schema/
iceberg.rs

1//! TODO
2//!
3//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on
4//! `polars_core::DataType`.
5use std::borrow::Cow;
6use std::sync::Arc;
7
8use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10use polars_utils::aliases::InitHashMaps;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::prelude::{DataType, Field, PlIndexMap};
14
15/// Maps Iceberg physical IDs to columns.
16///
17/// Note: This doesn't use `Schema<D>` as the keys are u32's.
18#[derive(Debug, Clone, Eq, PartialEq)]
19#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
20#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
21pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
22pub type IcebergSchemaRef = Arc<IcebergSchema>;
23
24impl IcebergSchema {
25    /// Constructs a schema keyed by the physical ID stored in the arrow field metadata.
26    pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
27        Self::try_from_arrow_fields_iter(schema.iter_values())
28    }
29
30    pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
31    where
32        I: IntoIterator<Item = &'a ArrowField>,
33    {
34        let iter = iter.into_iter();
35        let size_hint = iter.size_hint();
36
37        let mut out = PlIndexMap::with_capacity(size_hint.1.unwrap_or(size_hint.0));
38
39        for arrow_field in iter {
40            let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
41            let existing = out.insert(col.physical_id, col);
42
43            if let Some(existing) = existing {
44                polars_bail!(
45                    Duplicate:
46                    "IcebergSchema: duplicate physical ID {:?}",
47                    existing,
48                )
49            }
50        }
51
52        Ok(Self(out))
53    }
54}
55
56#[derive(Debug, Clone, Eq, Hash, PartialEq)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59pub struct IcebergColumn {
60    /// Output name
61    pub name: PlSmallStr,
62    /// This is expected to map from 'PARQUET:field_id'
63    pub physical_id: u32,
64    pub type_: IcebergColumnType,
65}
66
67#[derive(Debug, Clone, Eq, Hash, PartialEq)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
70pub enum IcebergColumnType {
71    Primitive {
72        /// This must not be a nested data type.
73        dtype: DataType,
74    },
75    List(Box<IcebergColumn>),
76    /// (values, width)
77    FixedSizeList(Box<IcebergColumn>, usize),
78    Struct(IcebergSchema),
79}
80
81impl IcebergColumnType {
82    pub fn to_polars_dtype(&self) -> DataType {
83        use IcebergColumnType::*;
84
85        match self {
86            Primitive { dtype } => dtype.clone(),
87            List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
88            FixedSizeList(inner, width) => {
89                feature_gated!("dtype-array", {
90                    DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
91                })
92            },
93            Struct(fields) => feature_gated!("dtype-struct", {
94                DataType::Struct(
95                    fields
96                        .values()
97                        .map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
98                        .collect(),
99                )
100            }),
101        }
102    }
103
104    pub fn is_nested(&self) -> bool {
105        use IcebergColumnType::*;
106
107        match self {
108            List(_) | FixedSizeList(..) | Struct(_) => true,
109            Primitive { .. } => false,
110        }
111    }
112}
113
114fn arrow_field_to_iceberg_column_rec(
115    field: &ArrowField,
116    field_id_override: Option<u32>,
117) -> PolarsResult<IcebergColumn> {
118    const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
119    const MAP_DEFAULT_ID: u32 = u32::MAX; // u32::MAX
120
121    let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122        field
123            .metadata
124            .as_deref()
125            .ok_or(Cow::Borrowed("metadata was None"))
126            .and_then(|md| {
127                md.get(PARQUET_FIELD_ID_KEY)
128                    .ok_or(Cow::Borrowed("key not found in metadata"))
129            })
130            .and_then(|x| {
131                x.parse()
132                    .map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133            })
134            .map_err(|failed_reason: Cow<'_, str>| {
135                polars_err!(
136                    SchemaFieldNotFound:
137                    "IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138                    &field.name,
139                    failed_reason,
140                )
141            })
142    })?;
143
144    // Prevent accidental re-use.
145    #[expect(unused)]
146    let field_id_override: ();
147
148    use ArrowDataType as ADT;
149
150    let name = field.name.clone();
151
152    let type_ = match &field.dtype {
153        ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154            // The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.
155            // Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type
156            // from Parquet (currently unsure if this is intended).
157            let field_id_override = field
158                .metadata
159                .as_ref()
160                .is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161                .then_some(MAP_DEFAULT_ID);
162
163            IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164                field,
165                field_id_override,
166            )?))
167        },
168
169        #[cfg(feature = "dtype-array")]
170        ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171            Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172            *width,
173        ),
174
175        #[cfg(feature = "dtype-struct")]
176        ADT::Struct(fields) => {
177            IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178        },
179
180        dtype => {
181            if dtype.is_nested() {
182                polars_bail!(
183                    ComputeError:
184                    "IcebergSchema: unsupported arrow type: {:?}",
185                    dtype,
186                )
187            }
188
189            let dtype =
190                DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
191
192            IcebergColumnType::Primitive { dtype }
193        },
194    };
195
196    let out = IcebergColumn {
197        name,
198        physical_id,
199        type_,
200    };
201
202    Ok(out)
203}
204
205impl<T> FromIterator<T> for IcebergSchema
206where
207    PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
208{
209    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
210        Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
211    }
212}
213
214impl std::hash::Hash for IcebergSchema {
215    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
216        for col in self.values() {
217            col.hash(state);
218        }
219    }
220}
221
222impl std::ops::Deref for IcebergSchema {
223    type Target = PlIndexMap<u32, IcebergColumn>;
224
225    fn deref(&self) -> &Self::Target {
226        &self.0
227    }
228}
229
230impl std::ops::DerefMut for IcebergSchema {
231    fn deref_mut(&mut self) -> &mut Self::Target {
232        &mut self.0
233    }
234}