polars_io/parquet/read/
utils.rs

1use std::borrow::Cow;
2
3use polars_core::prelude::{ArrowSchema, Column, DataFrame, DataType, IDX_DTYPE, Series};
4use polars_core::schema::{SchemaExt, SchemaNamesAndDtypes};
5use polars_error::{PolarsResult, polars_bail};
6use polars_schema::Schema;
7
8use crate::RowIndex;
9use crate::hive::materialize_hive_partitions;
10use crate::utils::apply_projection;
11
12pub fn materialize_empty_df(
13    projection: Option<&[usize]>,
14    reader_schema: &ArrowSchema,
15    hive_partition_columns: Option<&[Series]>,
16    row_index: Option<&RowIndex>,
17) -> DataFrame {
18    let schema = if let Some(projection) = projection {
19        Cow::Owned(apply_projection(reader_schema, projection))
20    } else {
21        Cow::Borrowed(reader_schema)
22    };
23    let mut df = DataFrame::empty_with_schema(&Schema::from_arrow_schema(&schema));
24
25    if let Some(row_index) = row_index {
26        df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))
27            .unwrap();
28    }
29
30    materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns);
31
32    df
33}
34
35pub(super) fn projected_arrow_schema_to_projection_indices(
36    schema: &ArrowSchema,
37    projected_arrow_schema: &ArrowSchema,
38) -> PolarsResult<Option<Vec<usize>>> {
39    let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());
40    let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();
41
42    for (i, field) in projected_arrow_schema.iter_values().enumerate() {
43        let dtype = {
44            let Some((idx, _, field)) = schema.get_full(&field.name) else {
45                polars_bail!(ColumnNotFound: "did not find column in file: {}", field.name)
46            };
47
48            projection_indices.push(idx);
49            is_full_ordered_projection &= idx == i;
50
51            DataType::from_arrow_field(field)
52        };
53        let expected_dtype = DataType::from_arrow_field(field);
54
55        if dtype.clone() != expected_dtype {
56            polars_bail!(
57                mismatch,
58                col = &field.name,
59                expected = expected_dtype,
60                found = dtype
61            );
62        }
63    }
64
65    Ok((!is_full_ordered_projection).then_some(projection_indices))
66}
67
68/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if
69/// that column exists in `schema`.
70pub fn ensure_matching_dtypes_if_found(
71    schema: &ArrowSchema,
72    current_schema: &ArrowSchema,
73) -> PolarsResult<()> {
74    current_schema
75        .iter_names_and_dtypes()
76        .try_for_each(|(name, dtype)| {
77            if let Some(field) = schema.get(name) {
78                if dtype != &field.dtype {
79                    // Check again with timezone normalization
80                    // TODO: Add an ArrowDtype eq wrapper?
81                    let lhs = DataType::from_arrow_dtype(dtype);
82                    let rhs = DataType::from_arrow_field(field);
83
84                    if lhs != rhs {
85                        polars_bail!(
86                            SchemaMismatch:
87                            "dtypes differ for column {}: {:?} != {:?}"
88                            , name, dtype, &field.dtype
89                        );
90                    }
91                }
92            }
93            Ok(())
94        })
95}