polars_core/schema/
iceberg.rs1use std::borrow::Cow;
6use std::sync::Arc;
7
8use arrow::datatypes::{ArrowDataType, ArrowSchema, Field as ArrowField};
9use polars_error::{PolarsResult, feature_gated, polars_bail, polars_err};
10use polars_utils::aliases::InitHashMaps;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::prelude::{DataType, Field, PlIndexMap};
14
15#[derive(Debug, Clone, Eq, PartialEq)]
19#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
20#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
21pub struct IcebergSchema(PlIndexMap<u32, IcebergColumn>);
22pub type IcebergSchemaRef = Arc<IcebergSchema>;
23
24impl IcebergSchema {
25 pub fn from_arrow_schema(schema: &ArrowSchema) -> PolarsResult<Self> {
27 Self::try_from_arrow_fields_iter(schema.iter_values())
28 }
29
30 pub fn try_from_arrow_fields_iter<'a, I>(iter: I) -> PolarsResult<Self>
31 where
32 I: IntoIterator<Item = &'a ArrowField>,
33 {
34 let iter = iter.into_iter();
35 let size_hint = iter.size_hint();
36
37 let mut out = PlIndexMap::with_capacity(size_hint.1.unwrap_or(size_hint.0));
38
39 for arrow_field in iter {
40 let col: IcebergColumn = arrow_field_to_iceberg_column_rec(arrow_field, None)?;
41 let existing = out.insert(col.physical_id, col);
42
43 if let Some(existing) = existing {
44 polars_bail!(
45 Duplicate:
46 "IcebergSchema: duplicate physical ID {:?}",
47 existing,
48 )
49 }
50 }
51
52 Ok(Self(out))
53 }
54}
55
56#[derive(Debug, Clone, Eq, Hash, PartialEq)]
57#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59pub struct IcebergColumn {
60 pub name: PlSmallStr,
62 pub physical_id: u32,
64 pub type_: IcebergColumnType,
65}
66
67#[derive(Debug, Clone, Eq, Hash, PartialEq)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
70pub enum IcebergColumnType {
71 Primitive {
72 dtype: DataType,
74 },
75 List(Box<IcebergColumn>),
76 FixedSizeList(Box<IcebergColumn>, usize),
78 Struct(IcebergSchema),
79}
80
81impl IcebergColumnType {
82 pub fn to_polars_dtype(&self) -> DataType {
83 use IcebergColumnType::*;
84
85 match self {
86 Primitive { dtype } => dtype.clone(),
87 List(inner) => DataType::List(Box::new(inner.type_.to_polars_dtype())),
88 FixedSizeList(inner, width) => {
89 feature_gated!("dtype-array", {
90 DataType::Array(Box::new(inner.type_.to_polars_dtype()), *width)
91 })
92 },
93 Struct(fields) => feature_gated!("dtype-struct", {
94 DataType::Struct(
95 fields
96 .values()
97 .map(|col| Field::new(col.name.clone(), col.type_.to_polars_dtype()))
98 .collect(),
99 )
100 }),
101 }
102 }
103
104 pub fn is_nested(&self) -> bool {
105 use IcebergColumnType::*;
106
107 match self {
108 List(_) | FixedSizeList(..) | Struct(_) => true,
109 Primitive { .. } => false,
110 }
111 }
112}
113
114fn arrow_field_to_iceberg_column_rec(
115 field: &ArrowField,
116 field_id_override: Option<u32>,
117) -> PolarsResult<IcebergColumn> {
118 const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
119 const MAP_DEFAULT_ID: u32 = u32::MAX; let physical_id: u32 = field_id_override.ok_or(Cow::Borrowed("")).or_else(|_| {
122 field
123 .metadata
124 .as_deref()
125 .ok_or(Cow::Borrowed("metadata was None"))
126 .and_then(|md| {
127 md.get(PARQUET_FIELD_ID_KEY)
128 .ok_or(Cow::Borrowed("key not found in metadata"))
129 })
130 .and_then(|x| {
131 x.parse()
132 .map_err(|_| Cow::Owned(format!("could not parse value as u32: '{x}'")))
133 })
134 .map_err(|failed_reason: Cow<'_, str>| {
135 polars_err!(
136 SchemaFieldNotFound:
137 "IcebergSchema: failed to load '{PARQUET_FIELD_ID_KEY}' for field {}: {}",
138 &field.name,
139 failed_reason,
140 )
141 })
142 })?;
143
144 #[expect(unused)]
146 let field_id_override: ();
147
148 use ArrowDataType as ADT;
149
150 let name = field.name.clone();
151
152 let type_ = match &field.dtype {
153 ADT::List(field) | ADT::LargeList(field) | ADT::Map(field, _) => {
154 let field_id_override = field
158 .metadata
159 .as_ref()
160 .is_none_or(|x| !x.contains_key(PARQUET_FIELD_ID_KEY))
161 .then_some(MAP_DEFAULT_ID);
162
163 IcebergColumnType::List(Box::new(arrow_field_to_iceberg_column_rec(
164 field,
165 field_id_override,
166 )?))
167 },
168
169 #[cfg(feature = "dtype-array")]
170 ADT::FixedSizeList(field, width) => IcebergColumnType::FixedSizeList(
171 Box::new(arrow_field_to_iceberg_column_rec(field, None)?),
172 *width,
173 ),
174
175 #[cfg(feature = "dtype-struct")]
176 ADT::Struct(fields) => {
177 IcebergColumnType::Struct(IcebergSchema::try_from_arrow_fields_iter(fields)?)
178 },
179
180 dtype => {
181 if dtype.is_nested() {
182 polars_bail!(
183 ComputeError:
184 "IcebergSchema: unsupported arrow type: {:?}",
185 dtype,
186 )
187 }
188
189 let dtype =
190 DataType::from_arrow_field(&ArrowField::new(name.clone(), dtype.clone(), true));
191
192 IcebergColumnType::Primitive { dtype }
193 },
194 };
195
196 let out = IcebergColumn {
197 name,
198 physical_id,
199 type_,
200 };
201
202 Ok(out)
203}
204
205impl<T> FromIterator<T> for IcebergSchema
206where
207 PlIndexMap<u32, IcebergColumn>: FromIterator<T>,
208{
209 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
210 Self(PlIndexMap::<u32, IcebergColumn>::from_iter(iter))
211 }
212}
213
214impl std::hash::Hash for IcebergSchema {
215 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
216 for col in self.values() {
217 col.hash(state);
218 }
219 }
220}
221
222impl std::ops::Deref for IcebergSchema {
223 type Target = PlIndexMap<u32, IcebergColumn>;
224
225 fn deref(&self) -> &Self::Target {
226 &self.0
227 }
228}
229
230impl std::ops::DerefMut for IcebergSchema {
231 fn deref_mut(&mut self) -> &mut Self::Target {
232 &mut self.0
233 }
234}