1#![allow(unsafe_op_in_unsafe_fn)]
2use arrow::datatypes::ArrowSchemaRef;
4use polars_row::ArrayRef;
5use polars_utils::UnitVec;
6use polars_utils::itertools::Itertools;
7use rayon::prelude::*;
8
9use crate::chunked_array::flags::StatisticsFlags;
10#[cfg(feature = "algorithm_group_by")]
11use crate::chunked_array::ops::unique::is_unique_helper;
12use crate::prelude::gather::check_bounds_ca;
13use crate::prelude::*;
14#[cfg(feature = "row_hash")]
15use crate::utils::split_df;
16use crate::utils::{Container, NoNull, slice_offsets, try_get_supertype};
17use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
18
19#[cfg(feature = "dataframe_arithmetic")]
20mod arithmetic;
21pub mod builder;
22mod chunks;
23pub use chunks::chunk_df_for_writing;
24mod broadcast;
25pub mod column;
26mod dataframe;
27mod filter;
28mod projection;
29pub use dataframe::DataFrame;
30use filter::filter_zero_width;
31use projection::{AmortizedColumnSelector, LINEAR_SEARCH_LIMIT};
32
33pub mod explode;
34mod from;
35#[cfg(feature = "algorithm_group_by")]
36pub mod group_by;
37pub(crate) mod horizontal;
38#[cfg(any(feature = "rows", feature = "object"))]
39pub mod row;
40mod top_k;
41mod upstream_traits;
42mod validation;
43
44use arrow::record_batch::{RecordBatch, RecordBatchT};
45use polars_utils::pl_str::PlSmallStr;
46#[cfg(feature = "serde")]
47use serde::{Deserialize, Serialize};
48use strum_macros::IntoStaticStr;
49
50use crate::POOL;
51#[cfg(feature = "row_hash")]
52use crate::hashing::_df_rows_to_hashes_threaded_vertical;
53use crate::prelude::sort::arg_sort;
54use crate::series::IsSorted;
55
56#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
57#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59#[strum(serialize_all = "snake_case")]
60pub enum UniqueKeepStrategy {
61 First,
63 Last,
65 None,
67 #[default]
70 Any,
71}
72
73#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
74#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
75#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
76#[strum(serialize_all = "snake_case")]
77pub enum PivotColumnNaming {
79 Combine,
81 #[default]
84 Auto,
85}
86
87impl DataFrame {
88 pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
89 self.columns().iter().map(Column::as_materialized_series)
90 }
91
92 pub fn estimated_size(&self) -> usize {
105 self.columns().iter().map(Column::estimated_size).sum()
106 }
107
108 pub fn try_apply_columns(
109 &self,
110 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
111 ) -> PolarsResult<Vec<Column>> {
112 return inner(self, &func);
113
114 fn inner(
115 slf: &DataFrame,
116 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
117 ) -> PolarsResult<Vec<Column>> {
118 slf.columns().iter().map(func).collect()
119 }
120 }
121
122 pub fn apply_columns(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
123 return inner(self, &func);
124
125 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
126 slf.columns().iter().map(func).collect()
127 }
128 }
129
130 pub fn try_apply_columns_par(
131 &self,
132 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
133 ) -> PolarsResult<Vec<Column>> {
134 return inner(self, &func);
135
136 fn inner(
137 slf: &DataFrame,
138 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
139 ) -> PolarsResult<Vec<Column>> {
140 POOL.install(|| slf.columns().par_iter().map(func).collect())
141 }
142 }
143
144 pub fn apply_columns_par(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
145 return inner(self, &func);
146
147 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
148 POOL.install(|| slf.columns().par_iter().map(func).collect())
149 }
150 }
151
152 pub(crate) fn reserve_chunks(&mut self, additional: usize) {
154 for s in unsafe { self.columns_mut_retain_schema() } {
155 if let Column::Series(s) = s {
156 unsafe { s.chunks_mut().reserve(additional) }
159 }
160 }
161 }
162 pub fn new_from_index(&self, index: usize, height: usize) -> Self {
163 let new_cols = self.apply_columns(|c| c.new_from_index(index, height));
164
165 unsafe { Self::_new_unchecked_impl(height, new_cols).with_schema_from(self) }
166 }
167
168 pub fn full_null(schema: &Schema, height: usize) -> Self {
170 let columns = schema
171 .iter_fields()
172 .map(|f| Column::full_null(f.name().clone(), height, f.dtype()))
173 .collect();
174
175 unsafe { DataFrame::_new_unchecked_impl(height, columns) }
176 }
177
178 pub fn ensure_matches_schema(&mut self, schema: &Schema) -> PolarsResult<()> {
181 let mut did_cast = false;
182 let cached_schema = self.cached_schema().cloned();
183
184 for (col, (name, dt)) in unsafe { self.columns_mut() }.iter_mut().zip(schema.iter()) {
185 polars_ensure!(
186 col.name() == name,
187 SchemaMismatch: "column name mismatch: expected {:?}, found {:?}",
188 name,
189 col.name()
190 );
191
192 let needs_cast = col.dtype().matches_schema_type(dt)?;
193
194 if needs_cast {
195 *col = col.cast(dt)?;
196 did_cast = true;
197 }
198 }
199
200 if !did_cast {
201 unsafe { self.set_opt_schema(cached_schema) };
202 }
203
204 Ok(())
205 }
206
207 pub fn with_row_index(&self, name: PlSmallStr, offset: Option<IdxSize>) -> PolarsResult<Self> {
242 let mut new_columns = Vec::with_capacity(self.width() + 1);
243 let offset = offset.unwrap_or(0);
244
245 if self.get_column_index(&name).is_some() {
246 polars_bail!(duplicate = name)
247 }
248
249 let col = Column::new_row_index(name, offset, self.height())?;
250 new_columns.push(col);
251 new_columns.extend_from_slice(self.columns());
252
253 Ok(unsafe { DataFrame::new_unchecked(self.height(), new_columns) })
254 }
255
256 pub unsafe fn with_row_index_mut(
264 &mut self,
265 name: PlSmallStr,
266 offset: Option<IdxSize>,
267 ) -> &mut Self {
268 debug_assert!(
269 self.get_column_index(&name).is_none(),
270 "with_row_index_mut(): column with name {} already exists",
271 &name
272 );
273
274 let offset = offset.unwrap_or(0);
275 let col = Column::new_row_index(name, offset, self.height()).unwrap();
276
277 unsafe { self.columns_mut() }.insert(0, col);
278 self
279 }
280
281 pub fn shrink_to_fit(&mut self) {
283 for s in unsafe { self.columns_mut_retain_schema() } {
285 s.shrink_to_fit();
286 }
287 }
288
289 pub fn rechunk_mut_par(&mut self) -> &mut Self {
292 if self.columns().iter().any(|c| c.n_chunks() > 1) {
293 POOL.install(|| {
294 unsafe { self.columns_mut_retain_schema() }
295 .par_iter_mut()
296 .for_each(|c| *c = c.rechunk());
297 })
298 }
299
300 self
301 }
302
303 pub fn rechunk_mut(&mut self) -> &mut Self {
305 let columns = unsafe { self.columns_mut() };
307
308 for col in columns.iter_mut().filter(|c| c.n_chunks() > 1) {
309 *col = col.rechunk();
310 }
311
312 self
313 }
314
315 pub fn should_rechunk(&self) -> bool {
317 if !self
320 .columns()
321 .iter()
322 .filter_map(|c| c.as_series().map(|s| s.n_chunks()))
323 .all_equal()
324 {
325 return true;
326 }
327
328 let mut chunk_lengths = self.materialized_column_iter().map(|s| s.chunk_lengths());
330 match chunk_lengths.next() {
331 None => false,
332 Some(first_column_chunk_lengths) => {
333 if first_column_chunk_lengths.size_hint().0 == 1 {
335 return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
336 }
337 let height = self.height();
340 let n_chunks = first_column_chunk_lengths.size_hint().0;
341 if n_chunks > height && !(height == 0 && n_chunks == 1) {
342 return true;
343 }
344 let v: Vec<_> = first_column_chunk_lengths.collect();
346 for cl in chunk_lengths {
347 if cl.enumerate().any(|(idx, el)| Some(&el) != v.get(idx)) {
348 return true;
349 }
350 }
351 false
352 },
353 }
354 }
355
356 pub fn align_chunks_par(&mut self) -> &mut Self {
358 if self.should_rechunk() {
359 self.rechunk_mut_par()
360 } else {
361 self
362 }
363 }
364
365 pub fn align_chunks(&mut self) -> &mut Self {
367 if self.should_rechunk() {
368 self.rechunk_mut()
369 } else {
370 self
371 }
372 }
373
374 pub fn get_column_names(&self) -> Vec<&PlSmallStr> {
385 self.columns().iter().map(|s| s.name()).collect()
386 }
387
388 pub fn get_column_names_owned(&self) -> Vec<PlSmallStr> {
390 self.columns().iter().map(|s| s.name().clone()).collect()
391 }
392
393 pub fn set_column_names<T>(&mut self, new_names: &[T]) -> PolarsResult<()>
405 where
406 T: AsRef<str>,
407 {
408 polars_ensure!(
409 new_names.len() == self.width(),
410 ShapeMismatch: "{} column names provided for a DataFrame of width {}",
411 new_names.len(), self.width()
412 );
413
414 validation::ensure_names_unique(new_names)?;
415
416 *unsafe { self.columns_mut() } = std::mem::take(unsafe { self.columns_mut() })
417 .into_iter()
418 .zip(new_names)
419 .map(|(c, name)| c.with_name(PlSmallStr::from_str(name.as_ref())))
420 .collect();
421
422 Ok(())
423 }
424
425 pub fn dtypes(&self) -> Vec<DataType> {
438 self.columns().iter().map(|s| s.dtype().clone()).collect()
439 }
440
441 pub fn first_col_n_chunks(&self) -> usize {
443 match self.columns().iter().find_map(|col| col.as_series()) {
444 None if self.width() == 0 => 0,
445 None => 1,
446 Some(s) => s.n_chunks(),
447 }
448 }
449
450 pub fn max_n_chunks(&self) -> usize {
452 self.columns()
453 .iter()
454 .map(|s| s.as_series().map(|s| s.n_chunks()).unwrap_or(1))
455 .max()
456 .unwrap_or(0)
457 }
458
459 pub fn fields(&self) -> Vec<Field> {
475 self.columns()
476 .iter()
477 .map(|s| s.field().into_owned())
478 .collect()
479 }
480
481 pub fn hstack(&self, columns: &[Column]) -> PolarsResult<Self> {
515 let mut new_cols = Vec::with_capacity(self.width() + columns.len());
516
517 new_cols.extend(self.columns().iter().cloned());
518 new_cols.extend_from_slice(columns);
519
520 DataFrame::new(self.height(), new_cols)
521 }
522 pub fn vstack(&self, other: &DataFrame) -> PolarsResult<Self> {
563 let mut df = self.clone();
564 df.vstack_mut(other)?;
565 Ok(df)
566 }
567
568 pub fn vstack_mut(&mut self, other: &DataFrame) -> PolarsResult<&mut Self> {
609 if self.width() != other.width() {
610 polars_ensure!(
611 self.shape() == (0, 0),
612 ShapeMismatch:
613 "unable to append to a DataFrame of shape {:?} with a DataFrame of width {}",
614 self.shape(), other.width(),
615 );
616
617 self.clone_from(other);
618
619 return Ok(self);
620 }
621
622 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
623
624 unsafe { self.columns_mut_retain_schema() }
625 .iter_mut()
626 .zip(other.columns())
627 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
628 ensure_can_extend(&*left, right)?;
629 left.append(right).map_err(|e| {
630 e.context(format!("failed to vstack column '{}'", right.name()).into())
631 })?;
632 Ok(())
633 })?;
634
635 unsafe { self.set_height(new_height) };
636
637 Ok(self)
638 }
639
640 pub fn vstack_mut_owned(&mut self, other: DataFrame) -> PolarsResult<&mut Self> {
641 if self.width() != other.width() {
642 polars_ensure!(
643 self.shape() == (0, 0),
644 ShapeMismatch:
645 "unable to append to a DataFrame of width {} with a DataFrame of width {}",
646 self.width(), other.width(),
647 );
648
649 *self = other;
650
651 return Ok(self);
652 }
653
654 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
655
656 unsafe { self.columns_mut_retain_schema() }
657 .iter_mut()
658 .zip(other.into_columns())
659 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
660 ensure_can_extend(&*left, &right)?;
661 let right_name = right.name().clone();
662 left.append_owned(right).map_err(|e| {
663 e.context(format!("failed to vstack column '{right_name}'").into())
664 })?;
665 Ok(())
666 })?;
667
668 unsafe { self.set_height(new_height) };
669
670 Ok(self)
671 }
672
673 pub fn vstack_mut_unchecked(&mut self, other: &DataFrame) -> &mut Self {
680 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
681
682 unsafe { self.columns_mut_retain_schema() }
683 .iter_mut()
684 .zip(other.columns())
685 .for_each(|(left, right)| {
686 left.append(right)
687 .map_err(|e| {
688 e.context(format!("failed to vstack column '{}'", right.name()).into())
689 })
690 .expect("should not fail");
691 });
692
693 unsafe { self.set_height(new_height) };
694
695 self
696 }
697
698 pub fn vstack_mut_owned_unchecked(&mut self, other: DataFrame) -> &mut Self {
705 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
706
707 unsafe { self.columns_mut_retain_schema() }
708 .iter_mut()
709 .zip(other.into_columns())
710 .for_each(|(left, right)| {
711 left.append_owned(right).expect("should not fail");
712 });
713
714 unsafe { self.set_height(new_height) };
715
716 self
717 }
718
719 pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
734 polars_ensure!(
735 self.width() == other.width(),
736 ShapeMismatch:
737 "unable to extend a DataFrame of width {} with a DataFrame of width {}",
738 self.width(), other.width(),
739 );
740
741 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
742
743 unsafe { self.columns_mut_retain_schema() }
744 .iter_mut()
745 .zip(other.columns())
746 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
747 ensure_can_extend(&*left, right)?;
748 left.extend(right).map_err(|e| {
749 e.context(format!("failed to extend column '{}'", right.name()).into())
750 })?;
751 Ok(())
752 })?;
753
754 unsafe { self.set_height(new_height) };
755
756 Ok(())
757 }
758
759 pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
776 let idx = self.try_get_column_index(name)?;
777 Ok(unsafe { self.columns_mut() }.remove(idx))
778 }
779
780 pub fn drop_nulls<S>(&self, subset: Option<&[S]>) -> PolarsResult<Self>
809 where
810 for<'a> &'a S: AsRef<str>,
811 {
812 if let Some(v) = subset {
813 let v = self.select_to_vec(v)?;
814 self._drop_nulls_impl(v.as_slice())
815 } else {
816 self._drop_nulls_impl(self.columns())
817 }
818 }
819
820 fn _drop_nulls_impl(&self, subset: &[Column]) -> PolarsResult<Self> {
821 if subset.iter().all(|s| !s.has_nulls()) {
823 return Ok(self.clone());
824 }
825
826 let mut iter = subset.iter();
827
828 let mask = iter
829 .next()
830 .ok_or_else(|| polars_err!(NoData: "no data to drop nulls from"))?;
831 let mut mask = mask.is_not_null();
832
833 for c in iter {
834 mask = mask & c.is_not_null();
835 }
836 self.filter(&mask)
837 }
838
839 pub fn drop(&self, name: &str) -> PolarsResult<Self> {
854 let idx = self.try_get_column_index(name)?;
855 let mut new_cols = Vec::with_capacity(self.width() - 1);
856
857 self.columns().iter().enumerate().for_each(|(i, s)| {
858 if i != idx {
859 new_cols.push(s.clone())
860 }
861 });
862
863 Ok(unsafe { DataFrame::_new_unchecked_impl(self.height(), new_cols) })
864 }
865
866 pub fn drop_many<I, S>(&self, names: I) -> Self
868 where
869 I: IntoIterator<Item = S>,
870 S: Into<PlSmallStr>,
871 {
872 let names: PlHashSet<PlSmallStr> = names.into_iter().map(|s| s.into()).collect();
873 self.drop_many_amortized(&names)
874 }
875
876 pub fn drop_many_amortized(&self, names: &PlHashSet<PlSmallStr>) -> DataFrame {
878 if names.is_empty() {
879 return self.clone();
880 }
881 let mut new_cols = Vec::with_capacity(self.width().saturating_sub(names.len()));
882 self.columns().iter().for_each(|s| {
883 if !names.contains(s.name()) {
884 new_cols.push(s.clone())
885 }
886 });
887
888 unsafe { DataFrame::new_unchecked(self.height(), new_cols) }
889 }
890
891 fn insert_column_no_namecheck(
894 &mut self,
895 index: usize,
896 column: Column,
897 ) -> PolarsResult<&mut Self> {
898 if self.shape() == (0, 0) {
899 unsafe { self.set_height(column.len()) };
900 }
901
902 polars_ensure!(
903 column.len() == self.height(),
904 ShapeMismatch:
905 "unable to add a column of length {} to a DataFrame of height {}",
906 column.len(), self.height(),
907 );
908
909 unsafe { self.columns_mut() }.insert(index, column);
910 Ok(self)
911 }
912
913 pub fn insert_column(&mut self, index: usize, column: Column) -> PolarsResult<&mut Self> {
915 let name = column.name();
916
917 polars_ensure!(
918 self.get_column_index(name).is_none(),
919 Duplicate:
920 "column with name {:?} is already present in the DataFrame", name
921 );
922
923 self.insert_column_no_namecheck(index, column)
924 }
925
926 pub fn with_column(&mut self, mut column: Column) -> PolarsResult<&mut Self> {
929 if self.shape() == (0, 0) {
930 unsafe { self.set_height(column.len()) };
931 }
932
933 if column.len() != self.height() && column.len() == 1 {
934 column = column.new_from_index(0, self.height());
935 }
936
937 polars_ensure!(
938 column.len() == self.height(),
939 ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
940 column.len(), self.height(),
941 );
942
943 if let Some(i) = self.get_column_index(column.name()) {
944 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
945 } else {
946 unsafe { self.columns_mut() }.push(column)
947 };
948
949 Ok(self)
950 }
951
952 pub unsafe fn push_column_unchecked(&mut self, column: Column) -> &mut Self {
958 unsafe { self.columns_mut() }.push(column);
959 self
960 }
961
962 pub fn with_columns_mut(
965 &mut self,
966 columns: impl IntoIterator<Item = Column>,
967 output_schema: &Schema,
968 ) -> PolarsResult<()> {
969 let columns = columns.into_iter();
970
971 unsafe {
972 self.columns_mut_retain_schema()
973 .reserve(columns.size_hint().0)
974 }
975
976 for c in columns {
977 self.with_column_and_schema_mut(c, output_schema)?;
978 }
979
980 Ok(())
981 }
982
983 fn with_column_and_schema_mut(
984 &mut self,
985 mut column: Column,
986 output_schema: &Schema,
987 ) -> PolarsResult<&mut Self> {
988 if self.shape() == (0, 0) {
989 unsafe { self.set_height(column.len()) };
990 }
991
992 if column.len() != self.height() && column.len() == 1 {
993 column = column.new_from_index(0, self.height());
994 }
995
996 polars_ensure!(
997 column.len() == self.height(),
998 ShapeMismatch:
999 "unable to add a column of length {} to a DataFrame of height {}",
1000 column.len(), self.height(),
1001 );
1002
1003 let i = output_schema
1004 .index_of(column.name())
1005 .or_else(|| self.get_column_index(column.name()))
1006 .unwrap_or(self.width());
1007
1008 if i < self.width() {
1009 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
1010 } else if i == self.width() {
1011 unsafe { self.columns_mut() }.push(column)
1012 } else {
1013 panic!()
1015 }
1016
1017 Ok(self)
1018 }
1019
1020 pub fn get(&self, idx: usize) -> Option<Vec<AnyValue<'_>>> {
1031 (idx < self.height()).then(|| self.columns().iter().map(|c| c.get(idx).unwrap()).collect())
1032 }
1033
1034 pub fn select_at_idx(&self, idx: usize) -> Option<&Column> {
1050 self.columns().get(idx)
1051 }
1052
1053 pub fn get_column_index(&self, name: &str) -> Option<usize> {
1071 if let Some(schema) = self.cached_schema() {
1072 schema.index_of(name)
1073 } else if self.width() <= LINEAR_SEARCH_LIMIT {
1074 self.columns().iter().position(|s| s.name() == name)
1075 } else {
1076 self.schema().index_of(name)
1077 }
1078 }
1079
1080 pub fn try_get_column_index(&self, name: &str) -> PolarsResult<usize> {
1082 self.get_column_index(name)
1083 .ok_or_else(|| polars_err!(col_not_found = name))
1084 }
1085
1086 pub fn column(&self, name: &str) -> PolarsResult<&Column> {
1100 let idx = self.try_get_column_index(name)?;
1101 Ok(self.select_at_idx(idx).unwrap())
1102 }
1103
1104 pub fn select<I, S>(&self, names: I) -> PolarsResult<Self>
1115 where
1116 I: IntoIterator<Item = S>,
1117 S: AsRef<str>,
1118 {
1119 DataFrame::new(self.height(), self.select_to_vec(names)?)
1120 }
1121
1122 pub unsafe fn select_unchecked<I, S>(&self, names: I) -> PolarsResult<Self>
1127 where
1128 I: IntoIterator<Item = S>,
1129 S: AsRef<str>,
1130 {
1131 Ok(unsafe { DataFrame::new_unchecked(self.height(), self.select_to_vec(names)?) })
1132 }
1133
1134 pub fn select_to_vec(
1152 &self,
1153 selection: impl IntoIterator<Item = impl AsRef<str>>,
1154 ) -> PolarsResult<Vec<Column>> {
1155 AmortizedColumnSelector::new(self).select_multiple(selection)
1156 }
1157
1158 pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1170 if self.width() == 0 {
1171 filter_zero_width(self.height(), mask)
1172 } else if mask.len() == 1 && self.len() >= 1 {
1173 if mask.all() && mask.null_count() == 0 {
1174 Ok(self.clone())
1175 } else {
1176 Ok(self.clear())
1177 }
1178 } else {
1179 let new_columns: Vec<Column> = self.try_apply_columns_par(|s| s.filter(mask))?;
1180 let out = unsafe {
1181 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1182 };
1183
1184 Ok(out)
1185 }
1186 }
1187
1188 pub fn filter_seq(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1190 if self.width() == 0 {
1191 filter_zero_width(self.height(), mask)
1192 } else if mask.len() == 1 && mask.null_count() == 0 && self.len() >= 1 {
1193 if mask.all() && mask.null_count() == 0 {
1194 Ok(self.clone())
1195 } else {
1196 Ok(self.clear())
1197 }
1198 } else {
1199 let new_columns: Vec<Column> = self.try_apply_columns(|s| s.filter(mask))?;
1200 let out = unsafe {
1201 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1202 };
1203
1204 Ok(out)
1205 }
1206 }
1207
1208 pub fn take(&self, indices: &IdxCa) -> PolarsResult<Self> {
1220 check_bounds_ca(indices, self.height().try_into().unwrap_or(IdxSize::MAX))?;
1221
1222 let new_cols = self.apply_columns_par(|c| {
1223 assert_eq!(c.len(), self.height());
1224 unsafe { c.take_unchecked(indices) }
1225 });
1226
1227 Ok(unsafe { DataFrame::new_unchecked(indices.len(), new_cols).with_schema_from(self) })
1228 }
1229
1230 pub unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
1233 self.take_unchecked_impl(idx, true)
1234 }
1235
1236 #[cfg(feature = "algorithm_group_by")]
1239 pub unsafe fn gather_group_unchecked(&self, group: &GroupsIndicator) -> Self {
1240 match group {
1241 GroupsIndicator::Idx((_, indices)) => unsafe {
1242 self.take_slice_unchecked_impl(indices.as_slice(), false)
1243 },
1244 GroupsIndicator::Slice([offset, len]) => self.slice(*offset as i64, *len as usize),
1245 }
1246 }
1247
1248 pub unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
1251 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1252 POOL.install(|| {
1253 if POOL.current_num_threads() > self.width() {
1254 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1255 if self.height() / stride >= 2 {
1256 self.apply_columns_par(|c| {
1257 let c = if c.dtype().is_nested() {
1260 &c.rechunk()
1261 } else {
1262 c
1263 };
1264
1265 (0..idx.len().div_ceil(stride))
1266 .into_par_iter()
1267 .map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
1268 .reduce(
1269 || Column::new_empty(c.name().clone(), c.dtype()),
1270 |mut a, b| {
1271 a.append_owned(b).unwrap();
1272 a
1273 },
1274 )
1275 })
1276 } else {
1277 self.apply_columns_par(|c| c.take_unchecked(idx))
1278 }
1279 } else {
1280 self.apply_columns_par(|c| c.take_unchecked(idx))
1281 }
1282 })
1283 } else {
1284 self.apply_columns(|s| s.take_unchecked(idx))
1285 };
1286
1287 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1288 }
1289
1290 pub unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self {
1293 self.take_slice_unchecked_impl(idx, true)
1294 }
1295
1296 pub unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
1299 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1300 POOL.install(|| {
1301 if POOL.current_num_threads() > self.width() {
1302 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1303 if self.height() / stride >= 2 {
1304 self.apply_columns_par(|c| {
1305 let c = if c.dtype().is_nested() {
1308 &c.rechunk()
1309 } else {
1310 c
1311 };
1312
1313 (0..idx.len().div_ceil(stride))
1314 .into_par_iter()
1315 .map(|i| {
1316 let idx = &idx[i * stride..];
1317 let idx = &idx[..idx.len().min(stride)];
1318 c.take_slice_unchecked(idx)
1319 })
1320 .reduce(
1321 || Column::new_empty(c.name().clone(), c.dtype()),
1322 |mut a, b| {
1323 a.append_owned(b).unwrap();
1324 a
1325 },
1326 )
1327 })
1328 } else {
1329 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1330 }
1331 } else {
1332 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1333 }
1334 })
1335 } else {
1336 self.apply_columns(|s| s.take_slice_unchecked(idx))
1337 };
1338 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1339 }
1340
1341 pub fn rename(&mut self, column: &str, name: PlSmallStr) -> PolarsResult<&mut Self> {
1356 if column == name.as_str() {
1357 return Ok(self);
1358 }
1359 polars_ensure!(
1360 !self.schema().contains(&name),
1361 Duplicate: "column rename attempted with already existing name \"{name}\""
1362 );
1363
1364 self.get_column_index(column)
1365 .and_then(|idx| unsafe { self.columns_mut() }.get_mut(idx))
1366 .ok_or_else(|| polars_err!(col_not_found = column))
1367 .map(|c| c.rename(name))?;
1368
1369 Ok(self)
1370 }
1371
1372 pub fn rename_many<'a>(
1373 &mut self,
1374 renames: impl Iterator<Item = (&'a str, PlSmallStr)>,
1375 ) -> PolarsResult<&mut Self> {
1376 let mut schema_arc = self.schema().clone();
1377 let schema = Arc::make_mut(&mut schema_arc);
1378
1379 for (from, to) in renames {
1380 if from == to.as_str() {
1381 continue;
1382 }
1383
1384 polars_ensure!(
1385 !schema.contains(&to),
1386 Duplicate: "column rename attempted with already existing name \"{to}\""
1387 );
1388
1389 match schema.get_full(from) {
1390 None => polars_bail!(col_not_found = from),
1391 Some((idx, _, _)) => {
1392 let (n, _) = schema.get_at_index_mut(idx).unwrap();
1393 *n = to.clone();
1394 unsafe { self.columns_mut() }
1395 .get_mut(idx)
1396 .unwrap()
1397 .rename(to);
1398 },
1399 }
1400 }
1401
1402 unsafe { self.set_schema(schema_arc) };
1403
1404 Ok(self)
1405 }
1406
1407 pub fn sort_in_place(
1411 &mut self,
1412 by: impl IntoIterator<Item = impl AsRef<str>>,
1413 sort_options: SortMultipleOptions,
1414 ) -> PolarsResult<&mut Self> {
1415 let by_column = self.select_to_vec(by)?;
1416
1417 let mut out = self.sort_impl(by_column, sort_options, None)?;
1418 unsafe { out.set_schema_from(self) };
1419
1420 *self = out;
1421
1422 Ok(self)
1423 }
1424
1425 #[doc(hidden)]
1426 pub fn sort_impl(
1428 &self,
1429 by_column: Vec<Column>,
1430 sort_options: SortMultipleOptions,
1431 slice: Option<(i64, usize)>,
1432 ) -> PolarsResult<Self> {
1433 if by_column.is_empty() {
1434 return if let Some((offset, len)) = slice {
1436 Ok(self.slice(offset, len))
1437 } else {
1438 Ok(self.clone())
1439 };
1440 }
1441
1442 for column in &by_column {
1443 if column.dtype().is_object() {
1444 polars_bail!(
1445 InvalidOperation: "column '{}' has a dtype of '{}', which does not support sorting", column.name(), column.dtype()
1446 )
1447 }
1448 }
1449
1450 let first_descending = sort_options.descending[0];
1455 let first_by_column = by_column[0].name().to_string();
1456
1457 let set_sorted = |df: &mut DataFrame| {
1458 let _ = df.apply(&first_by_column, |s| {
1461 let mut s = s.clone();
1462 if first_descending {
1463 s.set_sorted_flag(IsSorted::Descending)
1464 } else {
1465 s.set_sorted_flag(IsSorted::Ascending)
1466 }
1467 s
1468 });
1469 };
1470
1471 if self.shape_has_zero() {
1472 let mut out = self.clone();
1473 set_sorted(&mut out);
1474 return Ok(out);
1475 }
1476
1477 if let Some((0, k)) = slice {
1478 if k < self.height() {
1479 return self.bottom_k_impl(k, by_column, sort_options);
1480 }
1481 }
1482 #[cfg(feature = "dtype-categorical")]
1486 let is_not_categorical_enum =
1487 !(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
1488 || matches!(by_column[0].dtype(), DataType::Enum(_, _)));
1489
1490 #[cfg(not(feature = "dtype-categorical"))]
1491 #[allow(non_upper_case_globals)]
1492 const is_not_categorical_enum: bool = true;
1493
1494 if by_column.len() == 1 && is_not_categorical_enum {
1495 let required_sorting = if sort_options.descending[0] {
1496 IsSorted::Descending
1497 } else {
1498 IsSorted::Ascending
1499 };
1500 let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
1503 && ((by_column[0].null_count() == 0)
1504 || by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
1505 == sort_options.nulls_last[0]);
1506
1507 if no_sorting_required {
1508 return if let Some((offset, len)) = slice {
1509 Ok(self.slice(offset, len))
1510 } else {
1511 Ok(self.clone())
1512 };
1513 }
1514 }
1515
1516 let has_nested = by_column.iter().any(|s| s.dtype().is_nested());
1517 let allow_threads = sort_options.multithreaded;
1518
1519 let mut df = self.clone();
1521 let df = df.rechunk_mut_par();
1522 let mut take = match (by_column.len(), has_nested) {
1523 (1, false) => {
1524 let s = &by_column[0];
1525 let options = SortOptions {
1526 descending: sort_options.descending[0],
1527 nulls_last: sort_options.nulls_last[0],
1528 multithreaded: sort_options.multithreaded,
1529 maintain_order: sort_options.maintain_order,
1530 limit: sort_options.limit,
1531 };
1532 if df.width() == 1 && df.try_get_column_index(s.name().as_str()).is_ok() {
1536 let mut out = s.sort_with(options)?;
1537 if let Some((offset, len)) = slice {
1538 out = out.slice(offset, len);
1539 }
1540 return Ok(out.into_frame());
1541 }
1542 s.arg_sort(options)
1543 },
1544 _ => arg_sort(&by_column, sort_options)?,
1545 };
1546
1547 if let Some((offset, len)) = slice {
1548 take = take.slice(offset, len);
1549 }
1550
1551 let mut df = unsafe { df.take_unchecked_impl(&take, allow_threads) };
1554 set_sorted(&mut df);
1555 Ok(df)
1556 }
1557
1558 pub fn _to_metadata(&self) -> DataFrame {
1563 let num_columns = self.width();
1564
1565 let mut column_names =
1566 StringChunkedBuilder::new(PlSmallStr::from_static("column_name"), num_columns);
1567 let mut repr_ca = StringChunkedBuilder::new(PlSmallStr::from_static("repr"), num_columns);
1568 let mut sorted_asc_ca =
1569 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_asc"), num_columns);
1570 let mut sorted_dsc_ca =
1571 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_dsc"), num_columns);
1572 let mut fast_explode_list_ca =
1573 BooleanChunkedBuilder::new(PlSmallStr::from_static("fast_explode_list"), num_columns);
1574 let mut materialized_at_ca =
1575 StringChunkedBuilder::new(PlSmallStr::from_static("materialized_at"), num_columns);
1576
1577 for col in self.columns() {
1578 let flags = col.get_flags();
1579
1580 let (repr, materialized_at) = match col {
1581 Column::Series(s) => ("series", s.materialized_at()),
1582 Column::Scalar(_) => ("scalar", None),
1583 };
1584 let sorted_asc = flags.contains(StatisticsFlags::IS_SORTED_ASC);
1585 let sorted_dsc = flags.contains(StatisticsFlags::IS_SORTED_DSC);
1586 let fast_explode_list = flags.contains(StatisticsFlags::CAN_FAST_EXPLODE_LIST);
1587
1588 column_names.append_value(col.name().clone());
1589 repr_ca.append_value(repr);
1590 sorted_asc_ca.append_value(sorted_asc);
1591 sorted_dsc_ca.append_value(sorted_dsc);
1592 fast_explode_list_ca.append_value(fast_explode_list);
1593 materialized_at_ca.append_option(materialized_at.map(|v| format!("{v:#?}")));
1594 }
1595
1596 unsafe {
1597 DataFrame::new_unchecked(
1598 self.width(),
1599 vec![
1600 column_names.finish().into_column(),
1601 repr_ca.finish().into_column(),
1602 sorted_asc_ca.finish().into_column(),
1603 sorted_dsc_ca.finish().into_column(),
1604 fast_explode_list_ca.finish().into_column(),
1605 materialized_at_ca.finish().into_column(),
1606 ],
1607 )
1608 }
1609 }
1610 pub fn sort(
1648 &self,
1649 by: impl IntoIterator<Item = impl AsRef<str>>,
1650 sort_options: SortMultipleOptions,
1651 ) -> PolarsResult<Self> {
1652 let mut df = self.clone();
1653 df.sort_in_place(by, sort_options)?;
1654 Ok(df)
1655 }
1656
1657 pub fn replace(&mut self, column: &str, new_col: Column) -> PolarsResult<&mut Self> {
1672 self.apply(column, |_| new_col)
1673 }
1674
1675 pub fn replace_column(&mut self, index: usize, new_column: Column) -> PolarsResult<&mut Self> {
1690 polars_ensure!(
1691 index < self.width(),
1692 ShapeMismatch:
1693 "unable to replace at index {}, the DataFrame has only {} columns",
1694 index, self.width(),
1695 );
1696
1697 polars_ensure!(
1698 new_column.len() == self.height(),
1699 ShapeMismatch:
1700 "unable to replace a column, series length {} doesn't match the DataFrame height {}",
1701 new_column.len(), self.height(),
1702 );
1703
1704 unsafe { *self.columns_mut().get_mut(index).unwrap() = new_column };
1705
1706 Ok(self)
1707 }
1708
1709 pub fn apply<F, C>(&mut self, name: &str, f: F) -> PolarsResult<&mut Self>
1750 where
1751 F: FnOnce(&Column) -> C,
1752 C: IntoColumn,
1753 {
1754 let idx = self.try_get_column_index(name)?;
1755 self.apply_at_idx(idx, f)?;
1756 Ok(self)
1757 }
1758
1759 pub fn apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1790 where
1791 F: FnOnce(&Column) -> C,
1792 C: IntoColumn,
1793 {
1794 let df_height = self.height();
1795 let width = self.width();
1796
1797 let cached_schema = self.cached_schema().cloned();
1798
1799 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1800 polars_err!(
1801 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1802 idx, width
1803 )
1804 })?;
1805
1806 let mut new_col = f(col).into_column();
1807
1808 if new_col.len() != df_height && new_col.len() == 1 {
1809 new_col = new_col.new_from_index(0, df_height);
1810 }
1811
1812 polars_ensure!(
1813 new_col.len() == df_height,
1814 ShapeMismatch:
1815 "apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1816 new_col.len(), df_height
1817 );
1818
1819 new_col = new_col.with_name(col.name().clone());
1820 let col_before = std::mem::replace(col, new_col);
1821
1822 if col.dtype() == col_before.dtype() {
1823 unsafe { self.set_opt_schema(cached_schema) };
1824 }
1825
1826 Ok(self)
1827 }
1828
1829 pub fn try_apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1870 where
1871 F: FnOnce(&Column) -> PolarsResult<C>,
1872 C: IntoColumn,
1873 {
1874 let df_height = self.height();
1875 let width = self.width();
1876
1877 let cached_schema = self.cached_schema().cloned();
1878
1879 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1880 polars_err!(
1881 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1882 idx, width
1883 )
1884 })?;
1885
1886 let mut new_col = f(col).map(|c| c.into_column())?;
1887
1888 polars_ensure!(
1889 new_col.len() == df_height,
1890 ShapeMismatch:
1891 "try_apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1892 new_col.len(), df_height
1893 );
1894
1895 new_col = new_col.with_name(col.name().clone());
1897 let col_before = std::mem::replace(col, new_col);
1898
1899 if col.dtype() == col_before.dtype() {
1900 unsafe { self.set_opt_schema(cached_schema) };
1901 }
1902
1903 Ok(self)
1904 }
1905
1906 pub fn try_apply<F, C>(&mut self, column: &str, f: F) -> PolarsResult<&mut Self>
1949 where
1950 F: FnOnce(&Series) -> PolarsResult<C>,
1951 C: IntoColumn,
1952 {
1953 let idx = self.try_get_column_index(column)?;
1954 self.try_apply_at_idx(idx, |c| f(c.as_materialized_series()))
1955 }
1956
1957 #[must_use]
1987 pub fn slice(&self, offset: i64, length: usize) -> Self {
1988 if offset == 0 && length == self.height() {
1989 return self.clone();
1990 }
1991
1992 if length == 0 {
1993 return self.clear();
1994 }
1995
1996 let cols = self.apply_columns(|s| s.slice(offset, length));
1997
1998 let height = if let Some(fst) = cols.first() {
1999 fst.len()
2000 } else {
2001 let (_, length) = slice_offsets(offset, length, self.height());
2002 length
2003 };
2004
2005 unsafe { DataFrame::_new_unchecked_impl(height, cols).with_schema_from(self) }
2006 }
2007
2008 pub fn split_at(&self, offset: i64) -> (Self, Self) {
2010 let (a, b) = self.columns().iter().map(|s| s.split_at(offset)).unzip();
2011
2012 let (idx, _) = slice_offsets(offset, 0, self.height());
2013
2014 let a = unsafe { DataFrame::new_unchecked(idx, a).with_schema_from(self) };
2015 let b = unsafe { DataFrame::new_unchecked(self.height() - idx, b).with_schema_from(self) };
2016 (a, b)
2017 }
2018
2019 #[must_use]
2020 pub fn clear(&self) -> Self {
2021 let cols = self.columns().iter().map(|s| s.clear()).collect::<Vec<_>>();
2022 unsafe { DataFrame::_new_unchecked_impl(0, cols).with_schema_from(self) }
2023 }
2024
2025 #[must_use]
2026 pub fn slice_par(&self, offset: i64, length: usize) -> Self {
2027 if offset == 0 && length == self.height() {
2028 return self.clone();
2029 }
2030 let columns = self.apply_columns_par(|s| s.slice(offset, length));
2031 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2032 }
2033
2034 #[must_use]
2035 pub fn _slice_and_realloc(&self, offset: i64, length: usize) -> Self {
2036 if offset == 0 && length == self.height() {
2037 return self.clone();
2038 }
2039 let columns = self.apply_columns(|s| {
2041 let mut out = s.slice(offset, length);
2042 out.shrink_to_fit();
2043 out
2044 });
2045 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2046 }
2047
2048 #[must_use]
2082 pub fn head(&self, length: Option<usize>) -> Self {
2083 let new_height = usize::min(self.height(), length.unwrap_or(HEAD_DEFAULT_LENGTH));
2084 let new_cols = self.apply_columns(|c| c.head(Some(new_height)));
2085
2086 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2087 }
2088
2089 #[must_use]
2120 pub fn tail(&self, length: Option<usize>) -> Self {
2121 let new_height = usize::min(self.height(), length.unwrap_or(TAIL_DEFAULT_LENGTH));
2122 let new_cols = self.apply_columns(|c| c.tail(Some(new_height)));
2123
2124 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2125 }
2126
2127 pub fn iter_chunks(
2137 &self,
2138 compat_level: CompatLevel,
2139 parallel: bool,
2140 ) -> impl Iterator<Item = RecordBatch> + '_ {
2141 debug_assert!(!self.should_rechunk(), "expected equal chunks");
2142
2143 if self.width() == 0 {
2144 return RecordBatchIterWrap::new_zero_width(self.height());
2145 }
2146
2147 let must_convert = compat_level.0 == 0;
2150 let parallel = parallel
2151 && must_convert
2152 && self.width() > 1
2153 && self
2154 .columns()
2155 .iter()
2156 .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));
2157
2158 RecordBatchIterWrap::Batches(RecordBatchIter {
2159 df: self,
2160 schema: Arc::new(
2161 self.columns()
2162 .iter()
2163 .map(|c| c.field().to_arrow(compat_level))
2164 .collect(),
2165 ),
2166 idx: 0,
2167 n_chunks: usize::max(1, self.first_col_n_chunks()),
2168 compat_level,
2169 parallel,
2170 })
2171 }
2172
2173 pub fn iter_chunks_physical(&self) -> impl Iterator<Item = RecordBatch> + '_ {
2183 debug_assert!(!self.should_rechunk());
2184
2185 if self.width() == 0 {
2186 return RecordBatchIterWrap::new_zero_width(self.height());
2187 }
2188
2189 RecordBatchIterWrap::PhysicalBatches(PhysRecordBatchIter {
2190 schema: Arc::new(
2191 self.columns()
2192 .iter()
2193 .map(|c| c.field().to_arrow(CompatLevel::newest()))
2194 .collect(),
2195 ),
2196 arr_iters: self
2197 .materialized_column_iter()
2198 .map(|s| s.chunks().iter())
2199 .collect(),
2200 })
2201 }
2202
2203 #[must_use]
2205 pub fn reverse(&self) -> Self {
2206 let new_cols = self.apply_columns(Column::reverse);
2207 unsafe { DataFrame::new_unchecked(self.height(), new_cols).with_schema_from(self) }
2208 }
2209
2210 #[must_use]
2215 pub fn shift(&self, periods: i64) -> Self {
2216 let col = self.apply_columns_par(|s| s.shift(periods));
2217 unsafe { DataFrame::new_unchecked(self.height(), col).with_schema_from(self) }
2218 }
2219
2220 pub fn fill_null(&self, strategy: FillNullStrategy) -> PolarsResult<Self> {
2229 let col = self.try_apply_columns_par(|s| s.fill_null(strategy))?;
2230
2231 Ok(unsafe { DataFrame::new_unchecked(self.height(), col) })
2232 }
2233
2234 pub fn pipe<F, B>(self, f: F) -> PolarsResult<B>
2236 where
2237 F: Fn(DataFrame) -> PolarsResult<B>,
2238 {
2239 f(self)
2240 }
2241
2242 pub fn pipe_mut<F, B>(&mut self, f: F) -> PolarsResult<B>
2244 where
2245 F: Fn(&mut DataFrame) -> PolarsResult<B>,
2246 {
2247 f(self)
2248 }
2249
2250 pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> PolarsResult<B>
2252 where
2253 F: Fn(DataFrame, Args) -> PolarsResult<B>,
2254 {
2255 f(self, args)
2256 }
2257 #[cfg(feature = "algorithm_group_by")]
2291 pub fn unique_stable(
2292 &self,
2293 subset: Option<&[String]>,
2294 keep: UniqueKeepStrategy,
2295 slice: Option<(i64, usize)>,
2296 ) -> PolarsResult<DataFrame> {
2297 self.unique_impl(
2298 true,
2299 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2300 keep,
2301 slice,
2302 )
2303 }
2304
2305 #[cfg(feature = "algorithm_group_by")]
2307 pub fn unique<I, S>(
2308 &self,
2309 subset: Option<&[String]>,
2310 keep: UniqueKeepStrategy,
2311 slice: Option<(i64, usize)>,
2312 ) -> PolarsResult<DataFrame> {
2313 self.unique_impl(
2314 false,
2315 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2316 keep,
2317 slice,
2318 )
2319 }
2320
2321 #[cfg(feature = "algorithm_group_by")]
2322 pub fn unique_impl(
2323 &self,
2324 maintain_order: bool,
2325 subset: Option<Vec<PlSmallStr>>,
2326 keep: UniqueKeepStrategy,
2327 slice: Option<(i64, usize)>,
2328 ) -> PolarsResult<Self> {
2329 if self.width() == 0 {
2330 let height = usize::min(self.height(), 1);
2331 return Ok(DataFrame::empty_with_height(height));
2332 }
2333
2334 let names = subset.unwrap_or_else(|| self.get_column_names_owned());
2335 let mut df = self.clone();
2336 df.rechunk_mut_par();
2338
2339 let columns = match (keep, maintain_order) {
2340 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, true) => {
2341 let gb = df.group_by_stable(names)?;
2342 let groups = gb.get_groups();
2343 let (offset, len) = slice.unwrap_or((0, groups.len()));
2344 let groups = groups.slice(offset, len);
2345 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2346 },
2347 (UniqueKeepStrategy::Last, true) => {
2348 let gb = df.group_by_stable(names)?;
2351 let groups = gb.get_groups();
2352
2353 let last_idx: NoNull<IdxCa> = groups
2354 .iter()
2355 .map(|g| match g {
2356 GroupsIndicator::Idx((_first, idx)) => idx[idx.len() - 1],
2357 GroupsIndicator::Slice([first, len]) => first + len - 1,
2358 })
2359 .collect();
2360
2361 let mut last_idx = last_idx.into_inner().sort(false);
2362
2363 if let Some((offset, len)) = slice {
2364 last_idx = last_idx.slice(offset, len);
2365 }
2366
2367 let last_idx = NoNull::new(last_idx);
2368 let out = unsafe { df.take_unchecked(&last_idx) };
2369 return Ok(out);
2370 },
2371 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, false) => {
2372 let gb = df.group_by(names)?;
2373 let groups = gb.get_groups();
2374 let (offset, len) = slice.unwrap_or((0, groups.len()));
2375 let groups = groups.slice(offset, len);
2376 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2377 },
2378 (UniqueKeepStrategy::Last, false) => {
2379 let gb = df.group_by(names)?;
2380 let groups = gb.get_groups();
2381 let (offset, len) = slice.unwrap_or((0, groups.len()));
2382 let groups = groups.slice(offset, len);
2383 df.apply_columns_par(|s| unsafe { s.agg_last(&groups) })
2384 },
2385 (UniqueKeepStrategy::None, _) => {
2386 let df_part = df.select(names)?;
2387 let mask = df_part.is_unique()?;
2388 let mut filtered = df.filter(&mask)?;
2389
2390 if let Some((offset, len)) = slice {
2391 filtered = filtered.slice(offset, len);
2392 }
2393 return Ok(filtered);
2394 },
2395 };
2396 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns).with_schema_from(self) })
2397 }
2398
2399 #[cfg(feature = "algorithm_group_by")]
2413 pub fn is_unique(&self) -> PolarsResult<BooleanChunked> {
2414 let gb = self.group_by(self.get_column_names_owned())?;
2415 let groups = gb.get_groups();
2416 Ok(is_unique_helper(
2417 groups,
2418 self.height() as IdxSize,
2419 true,
2420 false,
2421 ))
2422 }
2423
2424 #[cfg(feature = "algorithm_group_by")]
2438 pub fn is_duplicated(&self) -> PolarsResult<BooleanChunked> {
2439 let gb = self.group_by(self.get_column_names_owned())?;
2440 let groups = gb.get_groups();
2441 Ok(is_unique_helper(
2442 groups,
2443 self.height() as IdxSize,
2444 false,
2445 true,
2446 ))
2447 }
2448
2449 #[must_use]
2451 pub fn null_count(&self) -> Self {
2452 let cols =
2453 self.apply_columns(|c| Column::new(c.name().clone(), [c.null_count() as IdxSize]));
2454 unsafe { Self::new_unchecked(1, cols) }
2455 }
2456
2457 #[cfg(feature = "row_hash")]
2459 pub fn hash_rows(
2460 &mut self,
2461 hasher_builder: Option<PlSeedableRandomStateQuality>,
2462 ) -> PolarsResult<UInt64Chunked> {
2463 let dfs = split_df(self, POOL.current_num_threads(), false);
2464 let (cas, _) = _df_rows_to_hashes_threaded_vertical(&dfs, hasher_builder)?;
2465
2466 let mut iter = cas.into_iter();
2467 let mut acc_ca = iter.next().unwrap();
2468 for ca in iter {
2469 acc_ca.append(&ca)?;
2470 }
2471 Ok(acc_ca.rechunk().into_owned())
2472 }
2473
2474 pub fn get_supertype(&self) -> Option<PolarsResult<DataType>> {
2476 self.columns()
2477 .iter()
2478 .map(|s| Ok(s.dtype().clone()))
2479 .reduce(|acc, b| try_get_supertype(&acc?, &b.unwrap()))
2480 }
2481
2482 #[doc(hidden)]
2487 pub unsafe fn _take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
2488 self._take_unchecked_slice_sorted(idx, allow_threads, IsSorted::Not)
2489 }
2490
2491 #[doc(hidden)]
2498 pub unsafe fn _take_unchecked_slice_sorted(
2499 &self,
2500 idx: &[IdxSize],
2501 allow_threads: bool,
2502 sorted: IsSorted,
2503 ) -> Self {
2504 #[cfg(debug_assertions)]
2505 {
2506 if idx.len() > 2 {
2507 use crate::series::IsSorted;
2508
2509 match sorted {
2510 IsSorted::Ascending => {
2511 assert!(idx[0] <= idx[idx.len() - 1]);
2512 },
2513 IsSorted::Descending => {
2514 assert!(idx[0] >= idx[idx.len() - 1]);
2515 },
2516 _ => {},
2517 }
2518 }
2519 }
2520 let mut ca = IdxCa::mmap_slice(PlSmallStr::EMPTY, idx);
2521 ca.set_sorted_flag(sorted);
2522 self.take_unchecked_impl(&ca, allow_threads)
2523 }
2524 #[cfg(all(feature = "partition_by", feature = "algorithm_group_by"))]
2525 #[doc(hidden)]
2526 pub fn _partition_by_impl(
2527 &self,
2528 cols: &[PlSmallStr],
2529 stable: bool,
2530 include_key: bool,
2531 parallel: bool,
2532 ) -> PolarsResult<Vec<DataFrame>> {
2533 let selected_keys = self.select_to_vec(cols.iter().cloned())?;
2534 let groups = self.group_by_with_series(selected_keys, parallel, stable)?;
2535 let groups = groups.into_groups();
2536
2537 let df = if include_key {
2539 self.clone()
2540 } else {
2541 self.drop_many(cols.iter().cloned())
2542 };
2543
2544 if parallel {
2545 POOL.install(|| {
2548 match groups.as_ref() {
2549 GroupsType::Idx(idx) => {
2550 let mut df = df.clone();
2552 df.rechunk_mut_par();
2553 Ok(idx
2554 .into_par_iter()
2555 .map(|(_, group)| {
2556 unsafe {
2558 df._take_unchecked_slice_sorted(
2559 group,
2560 false,
2561 IsSorted::Ascending,
2562 )
2563 }
2564 })
2565 .collect())
2566 },
2567 GroupsType::Slice { groups, .. } => Ok(groups
2568 .into_par_iter()
2569 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2570 .collect()),
2571 }
2572 })
2573 } else {
2574 match groups.as_ref() {
2575 GroupsType::Idx(idx) => {
2576 let mut df = df;
2578 df.rechunk_mut();
2579 Ok(idx
2580 .into_iter()
2581 .map(|(_, group)| {
2582 unsafe {
2584 df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
2585 }
2586 })
2587 .collect())
2588 },
2589 GroupsType::Slice { groups, .. } => Ok(groups
2590 .iter()
2591 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2592 .collect()),
2593 }
2594 }
2595 }
2596
2597 #[cfg(feature = "partition_by")]
2599 pub fn partition_by<I, S>(&self, cols: I, include_key: bool) -> PolarsResult<Vec<DataFrame>>
2600 where
2601 I: IntoIterator<Item = S>,
2602 S: Into<PlSmallStr>,
2603 {
2604 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2605 self._partition_by_impl(cols.as_slice(), false, include_key, true)
2606 }
2607
2608 #[cfg(feature = "partition_by")]
2611 pub fn partition_by_stable<I, S>(
2612 &self,
2613 cols: I,
2614 include_key: bool,
2615 ) -> PolarsResult<Vec<DataFrame>>
2616 where
2617 I: IntoIterator<Item = S>,
2618 S: Into<PlSmallStr>,
2619 {
2620 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2621 self._partition_by_impl(cols.as_slice(), true, include_key, true)
2622 }
2623
2624 #[cfg(feature = "dtype-struct")]
2627 pub fn unnest(
2628 &self,
2629 cols: impl IntoIterator<Item = impl Into<PlSmallStr>>,
2630 separator: Option<&str>,
2631 ) -> PolarsResult<DataFrame> {
2632 self.unnest_impl(cols.into_iter().map(Into::into).collect(), separator)
2633 }
2634
2635 #[cfg(feature = "dtype-struct")]
2636 fn unnest_impl(
2637 &self,
2638 cols: PlHashSet<PlSmallStr>,
2639 separator: Option<&str>,
2640 ) -> PolarsResult<DataFrame> {
2641 let mut new_cols = Vec::with_capacity(std::cmp::min(self.width() * 2, self.width() + 128));
2642 let mut count = 0;
2643 for s in self.columns() {
2644 if cols.contains(s.name()) {
2645 let ca = s.struct_()?.clone();
2646 new_cols.extend(ca.fields_as_series().into_iter().map(|mut f| {
2647 if let Some(separator) = &separator {
2648 f.rename(polars_utils::format_pl_smallstr!(
2649 "{}{}{}",
2650 s.name(),
2651 separator,
2652 f.name()
2653 ));
2654 }
2655 Column::from(f)
2656 }));
2657 count += 1;
2658 } else {
2659 new_cols.push(s.clone())
2660 }
2661 }
2662 if count != cols.len() {
2663 let schema = self.schema();
2666 for col in cols {
2667 let _ = schema
2668 .get(col.as_str())
2669 .ok_or_else(|| polars_err!(col_not_found = col))?;
2670 }
2671 }
2672
2673 DataFrame::new(self.height(), new_cols)
2674 }
2675
2676 pub fn append_record_batch(&mut self, rb: RecordBatchT<ArrayRef>) -> PolarsResult<()> {
2677 let df = DataFrame::from(rb);
2680 polars_ensure!(
2681 self.schema() == df.schema(),
2682 SchemaMismatch: "cannot append record batch with different schema\n\n
2683 Got {:?}\nexpected: {:?}", df.schema(), self.schema(),
2684 );
2685 self.vstack_mut_owned_unchecked(df);
2686 Ok(())
2687 }
2688}
2689
2690pub struct RecordBatchIter<'a> {
2691 df: &'a DataFrame,
2692 schema: ArrowSchemaRef,
2693 idx: usize,
2694 n_chunks: usize,
2695 compat_level: CompatLevel,
2696 parallel: bool,
2697}
2698
2699impl Iterator for RecordBatchIter<'_> {
2700 type Item = RecordBatch;
2701
2702 fn next(&mut self) -> Option<Self::Item> {
2703 if self.idx >= self.n_chunks {
2704 return None;
2705 }
2706
2707 let batch_cols: Vec<ArrayRef> = if self.parallel {
2709 let iter = self
2710 .df
2711 .columns()
2712 .par_iter()
2713 .map(Column::as_materialized_series)
2714 .map(|s| s.to_arrow(self.idx, self.compat_level));
2715 POOL.install(|| iter.collect())
2716 } else {
2717 self.df
2718 .columns()
2719 .iter()
2720 .map(Column::as_materialized_series)
2721 .map(|s| s.to_arrow(self.idx, self.compat_level))
2722 .collect()
2723 };
2724
2725 let length = batch_cols.first().map_or(0, |arr| arr.len());
2726
2727 self.idx += 1;
2728
2729 Some(RecordBatch::new(length, self.schema.clone(), batch_cols))
2730 }
2731
2732 fn size_hint(&self) -> (usize, Option<usize>) {
2733 let n = self.n_chunks - self.idx;
2734 (n, Some(n))
2735 }
2736}
2737
2738pub struct PhysRecordBatchIter<'a> {
2739 schema: ArrowSchemaRef,
2740 arr_iters: Vec<std::slice::Iter<'a, ArrayRef>>,
2741}
2742
2743impl Iterator for PhysRecordBatchIter<'_> {
2744 type Item = RecordBatch;
2745
2746 fn next(&mut self) -> Option<Self::Item> {
2747 let arrs = self
2748 .arr_iters
2749 .iter_mut()
2750 .map(|phys_iter| phys_iter.next().cloned())
2751 .collect::<Option<Vec<_>>>()?;
2752
2753 let length = arrs.first().map_or(0, |arr| arr.len());
2754 Some(RecordBatch::new(length, self.schema.clone(), arrs))
2755 }
2756
2757 fn size_hint(&self) -> (usize, Option<usize>) {
2758 if let Some(iter) = self.arr_iters.first() {
2759 iter.size_hint()
2760 } else {
2761 (0, None)
2762 }
2763 }
2764}
2765
2766pub enum RecordBatchIterWrap<'a> {
2767 ZeroWidth {
2768 remaining_height: usize,
2769 chunk_size: usize,
2770 },
2771 Batches(RecordBatchIter<'a>),
2772 PhysicalBatches(PhysRecordBatchIter<'a>),
2773}
2774
2775impl<'a> RecordBatchIterWrap<'a> {
2776 fn new_zero_width(height: usize) -> Self {
2777 Self::ZeroWidth {
2778 remaining_height: height,
2779 chunk_size: polars_config::config().ideal_morsel_size() as usize,
2780 }
2781 }
2782}
2783
2784impl Iterator for RecordBatchIterWrap<'_> {
2785 type Item = RecordBatch;
2786
2787 fn next(&mut self) -> Option<Self::Item> {
2788 match self {
2789 Self::ZeroWidth {
2790 remaining_height,
2791 chunk_size,
2792 } => {
2793 let n = usize::min(*remaining_height, *chunk_size);
2794 *remaining_height -= n;
2795
2796 (n > 0).then(|| RecordBatch::new(n, ArrowSchemaRef::default(), vec![]))
2797 },
2798 Self::Batches(v) => v.next(),
2799 Self::PhysicalBatches(v) => v.next(),
2800 }
2801 }
2802
2803 fn size_hint(&self) -> (usize, Option<usize>) {
2804 match self {
2805 Self::ZeroWidth {
2806 remaining_height,
2807 chunk_size,
2808 } => {
2809 let n = remaining_height.div_ceil(*chunk_size);
2810 (n, Some(n))
2811 },
2812 Self::Batches(v) => v.size_hint(),
2813 Self::PhysicalBatches(v) => v.size_hint(),
2814 }
2815 }
2816}
2817
2818fn ensure_can_extend(left: &Column, right: &Column) -> PolarsResult<()> {
2820 polars_ensure!(
2821 left.name() == right.name(),
2822 ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}",
2823 left.name(), right.name(),
2824 );
2825 Ok(())
2826}
2827
2828#[cfg(test)]
2829mod test {
2830 use super::*;
2831
2832 fn create_frame() -> DataFrame {
2833 let s0 = Column::new("days".into(), [0, 1, 2].as_ref());
2834 let s1 = Column::new("temp".into(), [22.1, 19.9, 7.].as_ref());
2835 DataFrame::new_infer_height(vec![s0, s1]).unwrap()
2836 }
2837
2838 #[test]
2839 #[cfg_attr(miri, ignore)]
2840 fn test_recordbatch_iterator() {
2841 let df = df!(
2842 "foo" => [1, 2, 3, 4, 5]
2843 )
2844 .unwrap();
2845 let mut iter = df.iter_chunks(CompatLevel::newest(), false);
2846 assert_eq!(5, iter.next().unwrap().len());
2847 assert!(iter.next().is_none());
2848 }
2849
2850 #[test]
2851 #[cfg_attr(miri, ignore)]
2852 fn test_select() {
2853 let df = create_frame();
2854 assert_eq!(
2855 df.column("days")
2856 .unwrap()
2857 .as_series()
2858 .unwrap()
2859 .equal(1)
2860 .unwrap()
2861 .sum(),
2862 Some(1)
2863 );
2864 }
2865
2866 #[test]
2867 #[cfg_attr(miri, ignore)]
2868 fn test_filter_broadcast_on_string_col() {
2869 let col_name = "some_col";
2870 let v = vec!["test".to_string()];
2871 let s0 = Column::new(PlSmallStr::from_str(col_name), v);
2872 let mut df = DataFrame::new_infer_height(vec![s0]).unwrap();
2873
2874 df = df
2875 .filter(
2876 &df.column(col_name)
2877 .unwrap()
2878 .as_materialized_series()
2879 .equal("")
2880 .unwrap(),
2881 )
2882 .unwrap();
2883 assert_eq!(
2884 df.column(col_name)
2885 .unwrap()
2886 .as_materialized_series()
2887 .n_chunks(),
2888 1
2889 );
2890 }
2891
2892 #[test]
2893 #[cfg_attr(miri, ignore)]
2894 fn test_filter_broadcast_on_list_col() {
2895 let s1 = Series::new(PlSmallStr::EMPTY, [true, false, true]);
2896 let ll: ListChunked = [&s1].iter().copied().collect();
2897
2898 let mask = BooleanChunked::from_slice(PlSmallStr::EMPTY, &[false]);
2899 let new = ll.filter(&mask).unwrap();
2900
2901 assert_eq!(new.chunks.len(), 1);
2902 assert_eq!(new.len(), 0);
2903 }
2904
2905 #[test]
2906 fn slice() {
2907 let df = create_frame();
2908 let sliced_df = df.slice(0, 2);
2909 assert_eq!(sliced_df.shape(), (2, 2));
2910 }
2911
2912 #[test]
2913 fn rechunk_false() {
2914 let df = create_frame();
2915 assert!(!df.should_rechunk())
2916 }
2917
2918 #[test]
2919 fn rechunk_true() -> PolarsResult<()> {
2920 let mut base = df!(
2921 "a" => [1, 2, 3],
2922 "b" => [1, 2, 3]
2923 )?;
2924
2925 let mut s = Series::new("foo".into(), 0..2);
2927 let s2 = Series::new("bar".into(), 0..1);
2928 s.append(&s2)?;
2929
2930 let out = base.with_column(s.into_column())?;
2932
2933 assert!(out.should_rechunk());
2935 Ok(())
2936 }
2937
2938 #[test]
2939 fn test_duplicate_column() {
2940 let mut df = df! {
2941 "foo" => [1, 2, 3]
2942 }
2943 .unwrap();
2944 assert!(
2946 df.with_column(Column::new("foo".into(), &[1, 2, 3]))
2947 .is_ok()
2948 );
2949 assert!(
2950 df.with_column(Column::new("bar".into(), &[1, 2, 3]))
2951 .is_ok()
2952 );
2953 assert!(df.column("bar").is_ok())
2954 }
2955
2956 #[test]
2957 #[cfg_attr(miri, ignore)]
2958 fn distinct() {
2959 let df = df! {
2960 "flt" => [1., 1., 2., 2., 3., 3.],
2961 "int" => [1, 1, 2, 2, 3, 3, ],
2962 "str" => ["a", "a", "b", "b", "c", "c"]
2963 }
2964 .unwrap();
2965 let df = df
2966 .unique_stable(None, UniqueKeepStrategy::First, None)
2967 .unwrap()
2968 .sort(["flt"], SortMultipleOptions::default())
2969 .unwrap();
2970 let valid = df! {
2971 "flt" => [1., 2., 3.],
2972 "int" => [1, 2, 3],
2973 "str" => ["a", "b", "c"]
2974 }
2975 .unwrap();
2976 assert!(df.equals(&valid));
2977 }
2978
2979 #[test]
2980 fn test_vstack() {
2981 let mut df = df! {
2983 "flt" => [1., 1., 2., 2., 3., 3.],
2984 "int" => [1, 1, 2, 2, 3, 3, ],
2985 "str" => ["a", "a", "b", "b", "c", "c"]
2986 }
2987 .unwrap();
2988
2989 df.vstack_mut(&df.slice(0, 3)).unwrap();
2990 assert_eq!(df.first_col_n_chunks(), 2)
2991 }
2992
2993 #[test]
2994 fn test_vstack_on_empty_dataframe() {
2995 let mut df = DataFrame::empty();
2996
2997 let df_data = df! {
2998 "flt" => [1., 1., 2., 2., 3., 3.],
2999 "int" => [1, 1, 2, 2, 3, 3, ],
3000 "str" => ["a", "a", "b", "b", "c", "c"]
3001 }
3002 .unwrap();
3003
3004 df.vstack_mut(&df_data).unwrap();
3005 assert_eq!(df.height(), 6)
3006 }
3007
3008 #[test]
3009 fn test_unique_keep_none_with_slice() {
3010 let df = df! {
3011 "x" => [1, 2, 3, 2, 1]
3012 }
3013 .unwrap();
3014 let out = df
3015 .unique_stable(
3016 Some(&["x".to_string()][..]),
3017 UniqueKeepStrategy::None,
3018 Some((0, 2)),
3019 )
3020 .unwrap();
3021 let expected = df! {
3022 "x" => [3]
3023 }
3024 .unwrap();
3025 assert!(out.equals(&expected));
3026 }
3027
3028 #[test]
3029 #[cfg(feature = "dtype-i8")]
3030 fn test_apply_result_schema() {
3031 let mut df = df! {
3032 "x" => [1, 2, 3, 2, 1]
3033 }
3034 .unwrap();
3035
3036 let schema_before = df.schema().clone();
3037 df.apply("x", |f| f.cast(&DataType::Int8).unwrap()).unwrap();
3038 assert_ne!(&schema_before, df.schema());
3039 }
3040}