polars_core/frame/row/
mod.rs1mod av_buffer;
2mod dataframe;
3mod transpose;
4
5use std::borrow::Borrow;
6use std::fmt::Debug;
7#[cfg(feature = "object")]
8use std::hash::{Hash, Hasher};
9
10use arrow::bitmap::Bitmap;
11pub use av_buffer::*;
12use polars_utils::format_pl_smallstr;
13#[cfg(feature = "object")]
14use polars_utils::total_ord::TotalHash;
15use rayon::prelude::*;
16
17use crate::POOL;
18use crate::prelude::*;
19use crate::utils::{dtypes_to_schema, dtypes_to_supertype, try_get_supertype};
20
21#[cfg(feature = "object")]
22pub(crate) struct AnyValueRows<'a> {
23 vals: Vec<AnyValue<'a>>,
24 width: usize,
25}
26
27#[cfg(feature = "object")]
28pub(crate) struct AnyValueRow<'a>(&'a [AnyValue<'a>]);
29
30#[cfg(feature = "object")]
31impl<'a> AnyValueRows<'a> {
32 pub(crate) fn get(&'a self, i: usize) -> AnyValueRow<'a> {
33 let start = i * self.width;
34 let end = (i + 1) * self.width;
35 AnyValueRow(&self.vals[start..end])
36 }
37}
38
39#[cfg(feature = "object")]
40impl TotalEq for AnyValueRow<'_> {
41 fn tot_eq(&self, other: &Self) -> bool {
42 let lhs = self.0;
43 let rhs = other.0;
44
45 debug_assert_eq!(lhs.len(), rhs.len());
47 lhs.iter().zip(rhs.iter()).all(|(l, r)| l == r)
48 }
49}
50
51#[cfg(feature = "object")]
52impl TotalHash for AnyValueRow<'_> {
53 fn tot_hash<H>(&self, state: &mut H)
54 where
55 H: Hasher,
56 {
57 self.0.iter().for_each(|av| av.hash(state))
58 }
59}
60
61impl DataFrame {
62 #[cfg(feature = "object")]
63 #[allow(clippy::wrong_self_convention)]
64 pub(crate) fn to_av_rows(&mut self) -> AnyValueRows<'_> {
66 let width = self.width();
67 let size = width * self.height();
68 let mut buf = vec![AnyValue::Null; size];
69 for (col_i, s) in self.materialized_column_iter().enumerate() {
70 for (row_i, av) in s.iter().enumerate() {
71 buf[row_i * width + col_i] = av
72 }
73 }
74 AnyValueRows { vals: buf, width }
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Default)]
79pub struct Row<'a>(pub Vec<AnyValue<'a>>);
80
81impl<'a> Row<'a> {
82 pub fn new(values: Vec<AnyValue<'a>>) -> Self {
83 Row(values)
84 }
85}
86
87type Tracker = PlIndexMap<PlSmallStr, PlHashSet<DataType>>;
88
89pub fn infer_schema(
90 iter: impl Iterator<Item = Vec<(impl Into<PlSmallStr>, impl Into<DataType>)>>,
91 infer_schema_length: usize,
92) -> Schema {
93 let mut values: Tracker = Tracker::default();
94 let len = iter.size_hint().1.unwrap_or(infer_schema_length);
95
96 let max_infer = std::cmp::min(len, infer_schema_length);
97 for inner in iter.take(max_infer) {
98 for (key, value) in inner {
99 add_or_insert(&mut values, key.into(), value.into());
100 }
101 }
102 Schema::from_iter(resolve_fields(values))
103}
104
105fn add_or_insert(values: &mut Tracker, key: PlSmallStr, dtype: DataType) {
106 if values.contains_key(&key) {
107 let x = values.get_mut(&key).unwrap();
108 x.insert(dtype);
109 } else {
110 let mut hs = PlHashSet::new();
112 hs.insert(dtype);
113 values.insert(key, hs);
114 }
115}
116
117fn resolve_fields(spec: Tracker) -> Vec<Field> {
118 spec.iter()
119 .map(|(k, hs)| {
120 let v: Vec<&DataType> = hs.iter().collect();
121 Field::new(k.clone(), coerce_dtype(&v))
122 })
123 .collect()
124}
125
126pub fn coerce_dtype<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
128 use DataType::*;
129
130 let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow());
131
132 if are_all_equal {
133 return datatypes[0].borrow().clone();
134 }
135 if datatypes.len() > 2 {
136 return String;
137 }
138
139 let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow());
140 try_get_supertype(lhs, rhs).unwrap_or(String)
141}
142
143pub fn rows_to_schema_supertypes(
147 rows: &[Row],
148 infer_schema_length: Option<usize>,
149) -> PolarsResult<Schema> {
150 let dtypes = rows_to_supertypes(rows, infer_schema_length)?;
151 let schema = dtypes_to_schema(dtypes);
152 Ok(schema)
153}
154
155pub fn rows_to_supertypes(
157 rows: &[Row],
158 infer_schema_length: Option<usize>,
159) -> PolarsResult<Vec<DataType>> {
160 polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");
161
162 let max_infer = infer_schema_length.unwrap_or(rows.len());
163
164 let mut dtypes: Vec<PlIndexSet<DataType>> = vec![PlIndexSet::new(); rows[0].0.len()];
165 for row in rows.iter().take(max_infer) {
166 for (val, dtypes_set) in row.0.iter().zip(dtypes.iter_mut()) {
167 dtypes_set.insert(val.into());
168 }
169 }
170
171 dtypes
172 .into_iter()
173 .map(|dtypes_set| dtypes_to_supertype(&dtypes_set))
174 .collect()
175}
176
177pub fn rows_to_schema_first_non_null(
179 rows: &[Row],
180 infer_schema_length: Option<usize>,
181) -> PolarsResult<Schema> {
182 polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");
183
184 let max_infer = infer_schema_length.unwrap_or(rows.len());
185 let mut schema: Schema = (&rows[0]).into();
186
187 for row in rows.iter().take(max_infer).skip(1) {
191 let nulls: Vec<_> = schema
193 .iter_values()
194 .enumerate()
195 .filter_map(|(i, dtype)| {
196 match dtype {
199 DataType::Null | DataType::List(_) => Some(i),
200 #[cfg(feature = "dtype-struct")]
201 DataType::Struct(_) => Some(i),
202 _ => None,
203 }
204 })
205 .collect();
206 if nulls.is_empty() {
207 break;
208 } else {
209 for i in nulls {
210 let val = &row.0[i];
211
212 if !val.is_nested_null() {
213 let dtype = val.into();
214 schema.set_dtype_at_index(i, dtype).unwrap();
215 }
216 }
217 }
218 }
219 Ok(schema)
220}
221
222impl<'a> From<&AnyValue<'a>> for Field {
223 fn from(val: &AnyValue<'a>) -> Self {
224 Field::new(PlSmallStr::EMPTY, val.into())
225 }
226}
227
228impl From<&Row<'_>> for Schema {
229 fn from(row: &Row) -> Self {
230 row.0
231 .iter()
232 .enumerate()
233 .map(|(i, av)| {
234 let dtype = av.into();
235 Field::new(format_pl_smallstr!("column_{i}"), dtype)
236 })
237 .collect()
238 }
239}