polars_io/catalog/unity/
schema.rs

1use polars_core::prelude::{DataType, Field};
2use polars_core::schema::{Schema, SchemaRef};
3use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
4use polars_utils::error::TruncateErrorDetail;
5use polars_utils::format_pl_smallstr;
6use polars_utils::pl_str::PlSmallStr;
7
8use super::models::{ColumnInfo, ColumnTypeJson, ColumnTypeJsonType, TableInfo};
9use crate::utils::decode_json_response;
10
11/// Returns `(schema, hive_schema)`
12pub fn table_info_to_schemas(
13    table_info: &TableInfo,
14) -> PolarsResult<(Option<SchemaRef>, Option<SchemaRef>)> {
15    let Some(columns) = table_info.columns.as_deref() else {
16        return Ok((None, None));
17    };
18
19    let mut schema = Schema::default();
20    let mut hive_schema = Schema::default();
21
22    for (i, col) in columns.iter().enumerate() {
23        if let Some(position) = col.position {
24            if usize::try_from(position).unwrap() != i {
25                polars_bail!(
26                    ComputeError:
27                    "not yet supported: position was not ordered"
28                )
29            }
30        }
31
32        let field = column_info_to_field(col)?;
33
34        if let Some(i) = col.partition_index {
35            if usize::try_from(i).unwrap() != hive_schema.len() {
36                polars_bail!(
37                    ComputeError:
38                    "not yet supported: partition_index was not ordered"
39                )
40            }
41
42            hive_schema.extend([field]);
43        } else {
44            schema.extend([field])
45        }
46    }
47
48    Ok((
49        Some(schema.into()),
50        Some(hive_schema)
51            .filter(|x| !x.is_empty())
52            .map(|x| x.into()),
53    ))
54}
55
56pub fn column_info_to_field(column_info: &ColumnInfo) -> PolarsResult<Field> {
57    Ok(Field::new(
58        column_info.name.clone(),
59        parse_type_json_str(&column_info.type_json)?,
60    ))
61}
62
63/// e.g.
64/// ```json
65/// {"name":"Int64","type":"long","nullable":true}
66/// {"name":"List","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true}
67/// ```
68pub fn parse_type_json_str(type_json: &str) -> PolarsResult<DataType> {
69    let decoded: ColumnTypeJson = decode_json_response(type_json.as_bytes())?;
70
71    parse_type_json(&decoded).map_err(|e| {
72        e.wrap_msg(|e| {
73            format!(
74                "error parsing type response: {}, type_json: {}",
75                e,
76                TruncateErrorDetail(type_json)
77            )
78        })
79    })
80}
81
82/// We prefer this as `type_text` cannot be trusted for consistency (e.g. we may expect `decimal(int,int)`
83/// but instead get `decimal`, or `struct<...>` and instead get `struct`).
84pub fn parse_type_json(type_json: &ColumnTypeJson) -> PolarsResult<DataType> {
85    use ColumnTypeJsonType::*;
86
87    let out = match &type_json.type_ {
88        TypeName(name) => match name.as_str() {
89            "array" => {
90                let inner_json: &ColumnTypeJsonType =
91                    type_json.element_type.as_ref().ok_or_else(|| {
92                        polars_err!(
93                            ComputeError:
94                            "missing elementType in response for array type"
95                        )
96                    })?;
97
98                let inner_dtype = parse_type_json_type(inner_json)?;
99
100                DataType::List(Box::new(inner_dtype))
101            },
102
103            "struct" => {
104                let fields_json: &[ColumnTypeJson] =
105                    type_json.fields.as_deref().ok_or_else(|| {
106                        polars_err!(
107                            ComputeError:
108                            "missing elementType in response for array type"
109                        )
110                    })?;
111
112                let fields = fields_json
113                    .iter()
114                    .map(|x| {
115                        let name = x.name.clone().ok_or_else(|| {
116                            polars_err!(
117                                ComputeError:
118                                "missing name in fields response for struct type"
119                            )
120                        })?;
121                        let dtype = parse_type_json(x)?;
122
123                        Ok(Field::new(name, dtype))
124                    })
125                    .collect::<PolarsResult<Vec<_>>>()?;
126
127                DataType::Struct(fields)
128            },
129
130            "map" => {
131                let key_type = type_json.key_type.as_ref().ok_or_else(|| {
132                    polars_err!(
133                        ComputeError:
134                        "missing keyType in response for map type"
135                    )
136                })?;
137
138                let value_type = type_json.value_type.as_ref().ok_or_else(|| {
139                    polars_err!(
140                        ComputeError:
141                        "missing valueType in response for map type"
142                    )
143                })?;
144
145                DataType::List(Box::new(DataType::Struct(vec![
146                    Field::new(
147                        PlSmallStr::from_static("key"),
148                        parse_type_json_type(key_type)?,
149                    ),
150                    Field::new(
151                        PlSmallStr::from_static("value"),
152                        parse_type_json_type(value_type)?,
153                    ),
154                ])))
155            },
156
157            name => parse_type_text(name)?,
158        },
159
160        TypeJson(type_json) => parse_type_json(type_json.as_ref())?,
161    };
162
163    Ok(out)
164}
165
166fn parse_type_json_type(type_json_type: &ColumnTypeJsonType) -> PolarsResult<DataType> {
167    use ColumnTypeJsonType::*;
168
169    match type_json_type {
170        TypeName(name) => parse_type_text(name),
171        TypeJson(type_json) => parse_type_json(type_json.as_ref()),
172    }
173}
174
175/// Parses the string variant of the `type` field within a `type_json`. This can be understood as
176/// the leaf / non-nested datatypes of the field.
177///
178/// References:
179/// * https://spark.apache.org/docs/latest/sql-ref-datatypes.html
180/// * https://docs.databricks.com/api/workspace/tables/get
181/// * https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html
182///
183/// Notes:
184/// * `type_precision` and `type_scale` in the API response are defined as supplementary data to
185///   the `type_text`, but from testing they aren't actually used - e.g. a decimal type would have a
186///   `type_text` of `decimal(18, 2)`
187fn parse_type_text(type_text: &str) -> PolarsResult<DataType> {
188    use DataType::*;
189    use polars_core::prelude::TimeUnit;
190
191    let dtype = match type_text {
192        "boolean" => Boolean,
193
194        "tinyint" | "byte" => Int8,
195        "smallint" | "short" => Int16,
196        "int" | "integer" => Int32,
197        "bigint" | "long" => Int64,
198
199        "float" | "real" => Float32,
200        "double" => Float64,
201
202        "date" => Date,
203        "timestamp" | "timestamp_ntz" | "timestamp_ltz" => Datetime(TimeUnit::Microseconds, None),
204
205        "string" => String,
206        "binary" => Binary,
207
208        "null" | "void" => Null,
209
210        v => {
211            if v.starts_with("decimal") {
212                // e.g. decimal(38,18)
213                (|| {
214                    let (precision, scale) = v
215                        .get(7..)?
216                        .strip_prefix('(')?
217                        .strip_suffix(')')?
218                        .split_once(',')?;
219                    let precision: usize = precision.parse().ok()?;
220                    let scale: usize = scale.parse().ok()?;
221
222                    Some(DataType::Decimal(Some(precision), Some(scale)))
223                })()
224                .ok_or_else(|| {
225                    polars_err!(
226                        ComputeError:
227                        "type format did not match decimal(int,int): {}",
228                        v
229                    )
230                })?
231            } else {
232                polars_bail!(
233                    ComputeError:
234                    "parse_type_text unknown type name: {}",
235                    v
236                )
237            }
238        },
239    };
240
241    Ok(dtype)
242}
243
244// Conversion functions to API format. Mainly used for constructing the request to create tables.
245
246pub fn schema_to_column_info_list(schema: &Schema) -> PolarsResult<Vec<ColumnInfo>> {
247    schema
248        .iter()
249        .enumerate()
250        .map(|(i, (name, dtype))| {
251            let name = name.clone();
252            let type_text = dtype_to_type_text(dtype)?;
253            let type_name = dtype_to_type_name(dtype)?;
254            let type_json = serde_json::to_string(&field_to_type_json(name.clone(), dtype)?)
255                .map_err(to_compute_err)?;
256
257            Ok(ColumnInfo {
258                name,
259                type_name,
260                type_text,
261                type_json,
262                position: Some(i.try_into().unwrap()),
263                comment: None,
264                partition_index: None,
265            })
266        })
267        .collect::<PolarsResult<_>>()
268}
269
270/// Creates the `type_text` field of the API. Opposite of [`parse_type_text`]
271fn dtype_to_type_text(dtype: &DataType) -> PolarsResult<PlSmallStr> {
272    use DataType::*;
273    use polars_core::prelude::TimeUnit;
274
275    macro_rules! S {
276        ($e:expr) => {
277            PlSmallStr::from_static($e)
278        };
279    }
280
281    let out = match dtype {
282        Boolean => S!("boolean"),
283
284        Int8 => S!("tinyint"),
285        Int16 => S!("smallint"),
286        Int32 => S!("int"),
287        Int64 => S!("bigint"),
288
289        Float32 => S!("float"),
290        Float64 => S!("double"),
291
292        Date => S!("date"),
293        Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),
294
295        String => S!("string"),
296        Binary => S!("binary"),
297
298        Null => S!("null"),
299
300        Decimal(precision, scale) => {
301            let precision = precision.unwrap_or(38);
302            let scale = scale.unwrap_or(0);
303
304            format_pl_smallstr!("decimal({},{})", precision, scale)
305        },
306
307        List(inner) => {
308            if let Some((key_type, value_type)) = get_list_map_type(inner) {
309                format_pl_smallstr!(
310                    "map<{},{}>",
311                    dtype_to_type_text(key_type)?,
312                    dtype_to_type_text(value_type)?
313                )
314            } else {
315                format_pl_smallstr!("array<{}>", dtype_to_type_text(inner)?)
316            }
317        },
318
319        Struct(fields) => {
320            // Yes, it's possible to construct column names containing the brackets. This won't
321            // affect us as we parse using `type_json` rather than this field.
322            let mut out = std::string::String::from("struct<");
323
324            for Field { name, dtype } in fields {
325                out.push_str(name);
326                out.push(':');
327                out.push_str(&dtype_to_type_text(dtype)?);
328                out.push(',');
329            }
330
331            if out.ends_with(',') {
332                out.truncate(out.len() - 1);
333            }
334
335            out.push('>');
336
337            out.into()
338        },
339
340        v => polars_bail!(
341            ComputeError:
342            "dtype_to_type_text unsupported type: {}",
343            v
344        ),
345    };
346
347    Ok(out)
348}
349
350/// Creates the `type_name` field, from testing this wasn't exactly the same as the `type_text` field.
351fn dtype_to_type_name(dtype: &DataType) -> PolarsResult<PlSmallStr> {
352    use DataType::*;
353    use polars_core::prelude::TimeUnit;
354
355    macro_rules! S {
356        ($e:expr) => {
357            PlSmallStr::from_static($e)
358        };
359    }
360
361    let out = match dtype {
362        Boolean => S!("BOOLEAN"),
363
364        Int8 => S!("BYTE"),
365        Int16 => S!("SHORT"),
366        Int32 => S!("INT"),
367        Int64 => S!("LONG"),
368
369        Float32 => S!("FLOAT"),
370        Float64 => S!("DOUBLE"),
371
372        Date => S!("DATE"),
373        Datetime(TimeUnit::Microseconds, None) => S!("TIMESTAMP_NTZ"),
374        String => S!("STRING"),
375        Binary => S!("BINARY"),
376
377        Null => S!("NULL"),
378
379        Decimal(..) => S!("DECIMAL"),
380
381        List(inner) => {
382            if get_list_map_type(inner).is_some() {
383                S!("MAP")
384            } else {
385                S!("ARRAY")
386            }
387        },
388
389        Struct(..) => S!("STRUCT"),
390
391        v => polars_bail!(
392            ComputeError:
393            "dtype_to_type_text unsupported type: {}",
394            v
395        ),
396    };
397
398    Ok(out)
399}
400
401/// Creates the `type_json` field.
402fn field_to_type_json(name: PlSmallStr, dtype: &DataType) -> PolarsResult<ColumnTypeJson> {
403    Ok(ColumnTypeJson {
404        name: Some(name),
405        type_: dtype_to_type_json(dtype)?,
406        nullable: Some(true),
407        // We set this to Some(_) so that the output matches the one generated by Databricks.
408        metadata: Some(Default::default()),
409
410        ..Default::default()
411    })
412}
413
414fn dtype_to_type_json(dtype: &DataType) -> PolarsResult<ColumnTypeJsonType> {
415    use DataType::*;
416    use polars_core::prelude::TimeUnit;
417
418    macro_rules! S {
419        ($e:expr) => {
420            ColumnTypeJsonType::from_static_type_name($e)
421        };
422    }
423
424    let out = match dtype {
425        Boolean => S!("boolean"),
426
427        Int8 => S!("byte"),
428        Int16 => S!("short"),
429        Int32 => S!("integer"),
430        Int64 => S!("long"),
431
432        Float32 => S!("float"),
433        Float64 => S!("double"),
434
435        Date => S!("date"),
436        Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),
437
438        String => S!("string"),
439        Binary => S!("binary"),
440
441        Null => S!("null"),
442
443        Decimal(..) => ColumnTypeJsonType::TypeName(dtype_to_type_text(dtype)?),
444
445        List(inner) => {
446            let out = if let Some((key_type, value_type)) = get_list_map_type(inner) {
447                ColumnTypeJson {
448                    type_: ColumnTypeJsonType::from_static_type_name("map"),
449                    key_type: Some(dtype_to_type_json(key_type)?),
450                    value_type: Some(dtype_to_type_json(value_type)?),
451                    value_contains_null: Some(true),
452
453                    ..Default::default()
454                }
455            } else {
456                ColumnTypeJson {
457                    type_: ColumnTypeJsonType::from_static_type_name("array"),
458                    element_type: Some(dtype_to_type_json(inner)?),
459                    contains_null: Some(true),
460
461                    ..Default::default()
462                }
463            };
464
465            ColumnTypeJsonType::TypeJson(Box::new(out))
466        },
467
468        Struct(fields) => {
469            let out = ColumnTypeJson {
470                type_: ColumnTypeJsonType::from_static_type_name("struct"),
471                fields: Some(
472                    fields
473                        .iter()
474                        .map(|Field { name, dtype }| field_to_type_json(name.clone(), dtype))
475                        .collect::<PolarsResult<_>>()?,
476                ),
477
478                ..Default::default()
479            };
480
481            ColumnTypeJsonType::TypeJson(Box::new(out))
482        },
483
484        v => polars_bail!(
485            ComputeError:
486            "dtype_to_type_text unsupported type: {}",
487            v
488        ),
489    };
490
491    Ok(out)
492}
493
494/// Tries to interpret the List type as a `map` field, which is essentially
495/// List(Struct(("key", <dtype>), ("value", <dtyoe>))).
496///
497/// Returns `Option<(key_type, value_type)>`
498fn get_list_map_type(list_inner_dtype: &DataType) -> Option<(&DataType, &DataType)> {
499    let DataType::Struct(fields) = list_inner_dtype else {
500        return None;
501    };
502
503    let [fld1, fld2] = fields.as_slice() else {
504        return None;
505    };
506
507    if !(fld1.name == "key" && fld2.name == "value") {
508        return None;
509    }
510
511    Some((fld1.dtype(), fld2.dtype()))
512}