polars_core/schema/
iceberg.rs

1//! TODO
2//!
3//! This should ideally be moved to `polars-schema`, currently it cannot due to dependency on
4//! `polars_core::DataType`.
5use std::borrow::Cow;
6use std::sync::Arc;
7
8use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10use polars_utils::aliases::InitHashMaps;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::prelude::{DataType, Field, PlIndexMap};
14
15pub const LIST_ELEMENT_DEFAULT_ID: u32 = u32::MAX;
16
17/// Maps Iceberg physical IDs to columns.
18///
19/// Note: This doesn't use `Schema<D>` as the keys are u32's.
20#[derive(Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
23pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
24pub type IcebergSchemaRef = Arc<IcebergSchema>;
25
26impl IcebergSchema {
27    /// Constructs a schema keyed by the physical ID stored in the arrow field metadata.
28    pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
29        Self::try_from_arrow_fields_iter(schema.iter_values())
30    }
31
32    pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
33    where
34        I: IntoIterator<Item = &'a ArrowField>,
35    {
36        let iter = iter.into_iter();
37
38        let mut out = PlIndexMap::with_capacity(iter.size_hint().0);
39
40        for arrow_field in iter {
41            let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
42            let existing = out.insert(col.physical_id, col);
43
44            if let Some(existing) = existing {
45                polars_bail!(
46                    Duplicate:
47                    "IcebergSchema: duplicate physical ID {:?}",
48                    existing,
49                )
50            }
51        }
52
53        Ok(Self(out))
54    }
55}
56
57#[derive(Debug, Clone, Eq, Hash, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60pub struct IcebergColumn {
61    /// Output name
62    pub name: PlSmallStr,
63    /// This is expected to map from 'PARQUET:field_id'
64    pub physical_id: u32,
65    pub type_: IcebergColumnType,
66}
67
68#[derive(Debug, Clone, Eq, Hash, PartialEq)]
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
71pub enum IcebergColumnType {
72    Primitive {
73        /// This must not be a nested data type.
74        dtype: DataType,
75    },
76    List(Box<IcebergColumn>),
77    /// (values, width)
78    FixedSizeList(Box<IcebergColumn>, usize),
79    Struct(IcebergSchema),
80}
81
82impl IcebergColumnType {
83    pub fn to_polars_dtype(&self) -> DataType {
84        use IcebergColumnType::*;
85
86        match self {
87            Primitive { dtype } => dtype.clone(),
88            List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
89            FixedSizeList(inner, width) => {
90                feature_gated!("dtype-array", {
91                    DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
92                })
93            },
94            Struct(fields) => feature_gated!("dtype-struct", {
95                DataType::Struct(
96                    fields
97                        .values()
98                        .map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
99                        .collect(),
100                )
101            }),
102        }
103    }
104
105    pub fn is_nested(&self) -> bool {
106        use IcebergColumnType::*;
107
108        match self {
109            List(_) | FixedSizeList(..) | Struct(_) => true,
110            Primitive { .. } => false,
111        }
112    }
113}
114
115fn arrow_field_to_iceberg_column_rec(
116    field: &ArrowField,
117    field_id_override: Option<u32>,
118) -> PolarsResult<IcebergColumn> {
119    const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
120
121    let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122        field
123            .metadata
124            .as_deref()
125            .ok_or(Cow::Borrowed("metadata was None"))
126            .and_then(|md| {
127                md.get(PARQUET_FIELD_ID_KEY)
128                    .ok_or(Cow::Borrowed("key not found in metadata"))
129            })
130            .and_then(|x| {
131                x.parse()
132                    .map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133            })
134            .map_err(|failed_reason: Cow<'_, str>| {
135                polars_err!(
136                    SchemaFieldNotFound:
137                    "IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138                    &field.name,
139                    failed_reason,
140                )
141            })
142    })?;
143
144    // Prevent accidental re-use.
145    #[expect(unused)]
146    let field_id_override: ();
147
148    use ArrowDataType as ADT;
149
150    let name = field.name.clone();
151
152    let type_ = match &field.dtype {
153        ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154            // The `field` directly under the `Map` type does not contain a physical ID, so we add one in here.
155            // Note that this branch also catches `(Large)List` as the `Map` columns get loaded as that type
156            // from Parquet (currently unsure if this is intended).
157            let field_id_override = field
158                .metadata
159                .as_ref()
160                .is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161                .then_some(LIST_ELEMENT_DEFAULT_ID);
162
163            IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164                field,
165                field_id_override,
166            )?))
167        },
168
169        #[cfg(feature = "dtype-array")]
170        ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171            Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172            *width,
173        ),
174
175        #[cfg(feature = "dtype-struct")]
176        ADT::Struct(fields) => {
177            IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178        },
179
180        dtype => {
181            if let ADT::Dictionary(_key_type, value_type, _is_sorted) = dtype
182                && !value_type.is_nested()
183            {
184                let dtype =
185                    DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
186
187                IcebergColumnType::Primitive { dtype }
188            } else if dtype.is_nested() {
189                polars_bail!(
190                    ComputeError:
191                    "IcebergSchema: unsupported arrow type: {:?}",
192                    dtype,
193                )
194            } else {
195                let dtype =
196                    DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
197
198                IcebergColumnType::Primitive { dtype }
199            }
200        },
201    };
202
203    let out = IcebergColumn {
204        name,
205        physical_id,
206        type_,
207    };
208
209    Ok(out)
210}
211
212impl<T> FromIterator<T> for IcebergSchema
213where
214    PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
215{
216    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
217        Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
218    }
219}
220
221impl std::hash::Hash for IcebergSchema {
222    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
223        for col in self.values() {
224            col.hash(state);
225        }
226    }
227}
228
229impl std::ops::Deref for IcebergSchema {
230    type Target = PlIndexMap<u32, IcebergColumn>;
231
232    fn deref(&self) -> &Self::Target {
233        &self.0
234    }
235}
236
237impl std::ops::DerefMut for IcebergSchema {
238    fn deref_mut(&mut self) -> &mut Self::Target {
239        &mut self.0
240    }
241}