polars_io/parquet/read/
utils.rs

1use std::borrow::Cow;
2
3use polars_core::prelude::{ArrowSchema, DataFrame, DataType, IDX_DTYPE, Series};
4use polars_core::schema::SchemaNamesAndDtypes;
5use polars_error::{PolarsResult, polars_bail};
6
7use crate::RowIndex;
8use crate::hive::materialize_hive_partitions;
9use crate::utils::apply_projection;
10
11pub fn materialize_empty_df(
12    projection: Option<&[usize]>,
13    reader_schema: &ArrowSchema,
14    hive_partition_columns: Option<&[Series]>,
15    row_index: Option<&RowIndex>,
16) -> DataFrame {
17    let schema = if let Some(projection) = projection {
18        Cow::Owned(apply_projection(reader_schema, projection))
19    } else {
20        Cow::Borrowed(reader_schema)
21    };
22    let mut df = DataFrame::empty_with_arrow_schema(&schema);
23
24    if let Some(row_index) = row_index {
25        df.insert_column(0, Series::new_empty(row_index.name.clone(), &IDX_DTYPE))
26            .unwrap();
27    }
28
29    materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns);
30
31    df
32}
33
34pub(super) fn projected_arrow_schema_to_projection_indices(
35    schema: &ArrowSchema,
36    projected_arrow_schema: &ArrowSchema,
37) -> PolarsResult<Option<Vec<usize>>> {
38    let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());
39    let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();
40
41    for (i, field) in projected_arrow_schema.iter_values().enumerate() {
42        let dtype = {
43            let Some((idx, _, field)) = schema.get_full(&field.name) else {
44                polars_bail!(ColumnNotFound: "did not find column in file: {}", field.name)
45            };
46
47            projection_indices.push(idx);
48            is_full_ordered_projection &= idx == i;
49
50            DataType::from_arrow_field(field)
51        };
52        let expected_dtype = DataType::from_arrow_field(field);
53
54        if dtype.clone() != expected_dtype {
55            polars_bail!(
56                mismatch,
57                col = &field.name,
58                expected = expected_dtype,
59                found = dtype
60            );
61        }
62    }
63
64    Ok((!is_full_ordered_projection).then_some(projection_indices))
65}
66
67/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if
68/// that column exists in `schema`.
69pub fn ensure_matching_dtypes_if_found(
70    schema: &ArrowSchema,
71    current_schema: &ArrowSchema,
72) -> PolarsResult<()> {
73    current_schema
74        .iter_names_and_dtypes()
75        .try_for_each(|(name, dtype)| {
76            if let Some(field) = schema.get(name) {
77                if dtype != &field.dtype {
78                    // Check again with timezone normalization
79                    // TODO: Add an ArrowDtype eq wrapper?
80                    let lhs = DataType::from_arrow_dtype(dtype);
81                    let rhs = DataType::from_arrow_field(field);
82
83                    if lhs != rhs {
84                        polars_bail!(
85                            SchemaMismatch:
86                            "dtypes differ for column {}: {:?} != {:?}"
87                            , name, dtype, &field.dtype
88                        );
89                    }
90                }
91            }
92            Ok(())
93        })
94}