polars_core/frame/row/
mod.rs

1mod av_buffer;
2mod dataframe;
3mod transpose;
4
5use std::borrow::Borrow;
6use std::fmt::Debug;
7#[cfg(feature = "object")]
8use std::hash::{Hash, Hasher};
9
10use arrow::bitmap::Bitmap;
11pub use av_buffer::*;
12use polars_utils::format_pl_smallstr;
13#[cfg(feature = "object")]
14use polars_utils::total_ord::TotalHash;
15use rayon::prelude::*;
16
17use crate::POOL;
18use crate::prelude::*;
19use crate::utils::{dtypes_to_schema, dtypes_to_supertype, try_get_supertype};
20
21#[cfg(feature = "object")]
22pub(crate) struct AnyValueRows<'a> {
23    vals: Vec<AnyValue<'a>>,
24    width: usize,
25}
26
27#[cfg(feature = "object")]
28pub(crate) struct AnyValueRow<'a>(&'a [AnyValue<'a>]);
29
30#[cfg(feature = "object")]
31impl<'a> AnyValueRows<'a> {
32    pub(crate) fn get(&'a self, i: usize) -> AnyValueRow<'a> {
33        let start = i * self.width;
34        let end = (i + 1) * self.width;
35        AnyValueRow(&self.vals[start..end])
36    }
37}
38
39#[cfg(feature = "object")]
40impl TotalEq for AnyValueRow<'_> {
41    fn tot_eq(&self, other: &Self) -> bool {
42        let lhs = self.0;
43        let rhs = other.0;
44
45        // Should only be used in that context.
46        debug_assert_eq!(lhs.len(), rhs.len());
47        lhs.iter().zip(rhs.iter()).all(|(l, r)| l == r)
48    }
49}
50
51#[cfg(feature = "object")]
52impl TotalHash for AnyValueRow<'_> {
53    fn tot_hash<H>(&self, state: &mut H)
54    where
55        H: Hasher,
56    {
57        self.0.iter().for_each(|av| av.hash(state))
58    }
59}
60
61impl DataFrame {
62    #[cfg(feature = "object")]
63    #[allow(clippy::wrong_self_convention)]
64    // Create indexable rows in a single allocation.
65    pub(crate) fn to_av_rows(&mut self) -> AnyValueRows<'_> {
66        self.as_single_chunk_par();
67        let width = self.width();
68        let size = width * self.height();
69        let mut buf = vec![AnyValue::Null; size];
70        for (col_i, s) in self.materialized_column_iter().enumerate() {
71            match s.dtype() {
72                #[cfg(feature = "object")]
73                DataType::Object(_) => {
74                    for row_i in 0..s.len() {
75                        let av = s.get(row_i).unwrap();
76                        buf[row_i * width + col_i] = av
77                    }
78                },
79                _ => {
80                    for (row_i, av) in s.iter().enumerate() {
81                        buf[row_i * width + col_i] = av
82                    }
83                },
84            }
85        }
86        AnyValueRows { vals: buf, width }
87    }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq, Default)]
91pub struct Row<'a>(pub Vec<AnyValue<'a>>);
92
93impl<'a> Row<'a> {
94    pub fn new(values: Vec<AnyValue<'a>>) -> Self {
95        Row(values)
96    }
97}
98
99type Tracker = PlIndexMap<PlSmallStr, PlHashSet<DataType>>;
100
101pub fn infer_schema(
102    iter: impl Iterator<Item = Vec<(impl Into<PlSmallStr>, impl Into<DataType>)>>,
103    infer_schema_length: usize,
104) -> Schema {
105    let mut values: Tracker = Tracker::default();
106    let len = iter.size_hint().1.unwrap_or(infer_schema_length);
107
108    let max_infer = std::cmp::min(len, infer_schema_length);
109    for inner in iter.take(max_infer) {
110        for (key, value) in inner {
111            add_or_insert(&mut values, key.into(), value.into());
112        }
113    }
114    Schema::from_iter(resolve_fields(values))
115}
116
117fn add_or_insert(values: &mut Tracker, key: PlSmallStr, dtype: DataType) {
118    if values.contains_key(&key) {
119        let x = values.get_mut(&key).unwrap();
120        x.insert(dtype);
121    } else {
122        // create hashset and add value type
123        let mut hs = PlHashSet::new();
124        hs.insert(dtype);
125        values.insert(key, hs);
126    }
127}
128
129fn resolve_fields(spec: Tracker) -> Vec<Field> {
130    spec.iter()
131        .map(|(k, hs)| {
132            let v: Vec<&DataType> = hs.iter().collect();
133            Field::new(k.clone(), coerce_dtype(&v))
134        })
135        .collect()
136}
137
138/// Coerces a slice of datatypes into a single supertype.
139pub fn coerce_dtype<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
140    use DataType::*;
141
142    let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow());
143
144    if are_all_equal {
145        return datatypes[0].borrow().clone();
146    }
147    if datatypes.len() > 2 {
148        return String;
149    }
150
151    let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow());
152    try_get_supertype(lhs, rhs).unwrap_or(String)
153}
154
155/// Infer the schema of rows by determining the supertype of the values.
156///
157/// Field names are set as `column_0`, `column_1`, and so on.
158pub fn rows_to_schema_supertypes(
159    rows: &[Row],
160    infer_schema_length: Option<usize>,
161) -> PolarsResult<Schema> {
162    let dtypes = rows_to_supertypes(rows, infer_schema_length)?;
163    let schema = dtypes_to_schema(dtypes);
164    Ok(schema)
165}
166
167/// Infer the schema data types of rows by determining the supertype of the values.
168pub fn rows_to_supertypes(
169    rows: &[Row],
170    infer_schema_length: Option<usize>,
171) -> PolarsResult<Vec<DataType>> {
172    polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");
173
174    let max_infer = infer_schema_length.unwrap_or(rows.len());
175
176    let mut dtypes: Vec<PlIndexSet<DataType>> = vec![PlIndexSet::new(); rows[0].0.len()];
177    for row in rows.iter().take(max_infer) {
178        for (val, dtypes_set) in row.0.iter().zip(dtypes.iter_mut()) {
179            dtypes_set.insert(val.into());
180        }
181    }
182
183    dtypes
184        .into_iter()
185        .map(|dtypes_set| dtypes_to_supertype(&dtypes_set))
186        .collect()
187}
188
189/// Infer schema from rows and set the first no null type as column data type.
190pub fn rows_to_schema_first_non_null(
191    rows: &[Row],
192    infer_schema_length: Option<usize>,
193) -> PolarsResult<Schema> {
194    polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");
195
196    let max_infer = infer_schema_length.unwrap_or(rows.len());
197    let mut schema: Schema = (&rows[0]).into();
198
199    // the first row that has no nulls will be used to infer the schema.
200    // if there is a null, we check the next row and see if we can update the schema
201
202    for row in rows.iter().take(max_infer).skip(1) {
203        // for i in 1..max_infer {
204        let nulls: Vec<_> = schema
205            .iter_values()
206            .enumerate()
207            .filter_map(|(i, dtype)| {
208                // double check struct and list types
209                // nested null values can be wrongly inferred by front ends
210                match dtype {
211                    DataType::Null | DataType::List(_) => Some(i),
212                    #[cfg(feature = "dtype-struct")]
213                    DataType::Struct(_) => Some(i),
214                    _ => None,
215                }
216            })
217            .collect();
218        if nulls.is_empty() {
219            break;
220        } else {
221            for i in nulls {
222                let val = &row.0[i];
223
224                if !val.is_nested_null() {
225                    let dtype = val.into();
226                    schema.set_dtype_at_index(i, dtype).unwrap();
227                }
228            }
229        }
230    }
231    Ok(schema)
232}
233
234impl<'a> From<&AnyValue<'a>> for Field {
235    fn from(val: &AnyValue<'a>) -> Self {
236        Field::new(PlSmallStr::EMPTY, val.into())
237    }
238}
239
240impl From<&Row<'_>> for Schema {
241    fn from(row: &Row) -> Self {
242        row.0
243            .iter()
244            .enumerate()
245            .map(|(i, av)| {
246                let dtype = av.into();
247                Field::new(format_pl_smallstr!("column_{i}"), dtype)
248            })
249            .collect()
250    }
251}