1use std::borrow::Cow;
6use std::sync::Arc;
7
8use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10use polars_utils::aliases::InitHashMaps;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::prelude::{DataType, Field, PlIndexMap};
14
15pub const LIST_ELEMENT_DEFAULT_ID: u32 = u32::MAX;
16
17#[derive(Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
23pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
24pub type IcebergSchemaRef = Arc<IcebergSchema>;
25
26impl IcebergSchema {
27 pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
29 Self::try_from_arrow_fields_iter(schema.iter_values())
30 }
31
32 pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
33 where
34 I: IntoIterator<Item = &'a ArrowField>,
35 {
36 let iter = iter.into_iter();
37
38 let mut out = PlIndexMap::with_capacity(iter.size_hint().0);
39
40 for arrow_field in iter {
41 let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
42 let existing = out.insert(col.physical_id, col);
43
44 if let Some(existing) = existing {
45 polars_bail!(
46 Duplicate:
47 "IcebergSchema: duplicate physical ID {:?}",
48 existing,
49 )
50 }
51 }
52
53 Ok(Self(out))
54 }
55}
56
57#[derive(Debug, Clone, Eq, Hash, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
60pub struct IcebergColumn {
61 pub name: PlSmallStr,
63 pub physical_id: u32,
65 pub type_: IcebergColumnType,
66}
67
68#[derive(Debug, Clone, Eq, Hash, PartialEq)]
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
71pub enum IcebergColumnType {
72 Primitive {
73 dtype: DataType,
75 },
76 List(Box<IcebergColumn>),
77 FixedSizeList(Box<IcebergColumn>, usize),
79 Struct(IcebergSchema),
80}
81
82impl IcebergColumnType {
83 pub fn to_polars_dtype(&self) -> DataType {
84 use IcebergColumnType::*;
85
86 match self {
87 Primitive { dtype } => dtype.clone(),
88 List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
89 FixedSizeList(inner, width) => {
90 feature_gated!("dtype-array", {
91 DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
92 })
93 },
94 Struct(fields) => feature_gated!("dtype-struct", {
95 DataType::Struct(
96 fields
97 .values()
98 .map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
99 .collect(),
100 )
101 }),
102 }
103 }
104
105 pub fn is_nested(&self) -> bool {
106 use IcebergColumnType::*;
107
108 match self {
109 List(_) | FixedSizeList(..) | Struct(_) => true,
110 Primitive { .. } => false,
111 }
112 }
113}
114
115fn arrow_field_to_iceberg_column_rec(
116 field: &ArrowField,
117 field_id_override: Option<u32>,
118) -> PolarsResult<IcebergColumn> {
119 const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
120
121 let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122 field
123 .metadata
124 .as_deref()
125 .ok_or(Cow::Borrowed("metadata was None"))
126 .and_then(|md| {
127 md.get(PARQUET_FIELD_ID_KEY)
128 .ok_or(Cow::Borrowed("key not found in metadata"))
129 })
130 .and_then(|x| {
131 x.parse()
132 .map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133 })
134 .map_err(|failed_reason: Cow<'_, str>| {
135 polars_err!(
136 SchemaFieldNotFound:
137 "IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138 &field.name,
139 failed_reason,
140 )
141 })
142 })?;
143
144 #[expect(unused)]
146 let field_id_override: ();
147
148 use ArrowDataType as ADT;
149
150 let name = field.name.clone();
151
152 let type_ = match &field.dtype {
153 ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154 let field_id_override = field
158 .metadata
159 .as_ref()
160 .is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161 .then_some(LIST_ELEMENT_DEFAULT_ID);
162
163 IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164 field,
165 field_id_override,
166 )?))
167 },
168
169 #[cfg(feature = "dtype-array")]
170 ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171 Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172 *width,
173 ),
174
175 #[cfg(feature = "dtype-struct")]
176 ADT::Struct(fields) => {
177 IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178 },
179
180 dtype => {
181 if let ADT::Dictionary(_key_type, value_type, _is_sorted) = dtype
182 && !value_type.is_nested()
183 {
184 let dtype =
185 DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
186
187 IcebergColumnType::Primitive { dtype }
188 } else if dtype.is_nested() {
189 polars_bail!(
190 ComputeError:
191 "IcebergSchema: unsupported arrow type: {:?}",
192 dtype,
193 )
194 } else {
195 let dtype =
196 DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
197
198 IcebergColumnType::Primitive { dtype }
199 }
200 },
201 };
202
203 let out = IcebergColumn {
204 name,
205 physical_id,
206 type_,
207 };
208
209 Ok(out)
210}
211
212impl<T> FromIterator<T> for IcebergSchema
213where
214 PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
215{
216 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
217 Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
218 }
219}
220
221impl std::hash::Hash for IcebergSchema {
222 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
223 for col in self.values() {
224 col.hash(state);
225 }
226 }
227}
228
229impl std::ops::Deref for IcebergSchema {
230 type Target = PlIndexMap<u32, IcebergColumn>;
231
232 fn deref(&self) -> &Self::Target {
233 &self.0
234 }
235}
236
237impl std::ops::DerefMut for IcebergSchema {
238 fn deref_mut(&mut self) -> &mut Self::Target {
239 &mut self.0
240 }
241}