1#![allow(unsafe_op_in_unsafe_fn)]
2use arrow::datatypes::ArrowSchemaRef;
4use polars_row::ArrayRef;
5use polars_utils::UnitVec;
6use polars_utils::itertools::Itertools;
7use rayon::prelude::*;
8
9use crate::chunked_array::flags::StatisticsFlags;
10#[cfg(feature = "algorithm_group_by")]
11use crate::chunked_array::ops::unique::is_unique_helper;
12use crate::prelude::gather::check_bounds_ca;
13use crate::prelude::*;
14#[cfg(feature = "row_hash")]
15use crate::utils::split_df;
16use crate::utils::{Container, NoNull, slice_offsets, try_get_supertype};
17use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
18
19#[cfg(feature = "dataframe_arithmetic")]
20mod arithmetic;
21pub mod builder;
22mod chunks;
23pub use chunks::chunk_df_for_writing;
24mod broadcast;
25pub mod column;
26mod dataframe;
27mod filter;
28mod projection;
29pub use dataframe::DataFrame;
30use filter::filter_zero_width;
31use projection::{AmortizedColumnSelector, LINEAR_SEARCH_LIMIT};
32
33pub mod explode;
34mod from;
35#[cfg(feature = "algorithm_group_by")]
36pub mod group_by;
37pub(crate) mod horizontal;
38#[cfg(any(feature = "rows", feature = "object"))]
39pub mod row;
40mod top_k;
41mod upstream_traits;
42mod validation;
43
44use arrow::record_batch::{RecordBatch, RecordBatchT};
45use polars_utils::pl_str::PlSmallStr;
46#[cfg(feature = "serde")]
47use serde::{Deserialize, Serialize};
48use strum_macros::IntoStaticStr;
49
50use crate::POOL;
51#[cfg(feature = "row_hash")]
52use crate::hashing::_df_rows_to_hashes_threaded_vertical;
53use crate::prelude::sort::arg_sort;
54use crate::series::IsSorted;
55
56#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
57#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
58#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59#[strum(serialize_all = "snake_case")]
60pub enum UniqueKeepStrategy {
61 First,
63 Last,
65 None,
67 #[default]
70 Any,
71}
72
73impl DataFrame {
74 pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
75 self.columns().iter().map(Column::as_materialized_series)
76 }
77
78 pub fn estimated_size(&self) -> usize {
91 self.columns().iter().map(Column::estimated_size).sum()
92 }
93
94 pub fn try_apply_columns(
95 &self,
96 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
97 ) -> PolarsResult<Vec<Column>> {
98 return inner(self, &func);
99
100 fn inner(
101 slf: &DataFrame,
102 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
103 ) -> PolarsResult<Vec<Column>> {
104 slf.columns().iter().map(func).collect()
105 }
106 }
107
108 pub fn apply_columns(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
109 return inner(self, &func);
110
111 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
112 slf.columns().iter().map(func).collect()
113 }
114 }
115
116 pub fn try_apply_columns_par(
117 &self,
118 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
119 ) -> PolarsResult<Vec<Column>> {
120 return inner(self, &func);
121
122 fn inner(
123 slf: &DataFrame,
124 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
125 ) -> PolarsResult<Vec<Column>> {
126 POOL.install(|| slf.columns().par_iter().map(func).collect())
127 }
128 }
129
130 pub fn apply_columns_par(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
131 return inner(self, &func);
132
133 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
134 POOL.install(|| slf.columns().par_iter().map(func).collect())
135 }
136 }
137
138 pub(crate) fn reserve_chunks(&mut self, additional: usize) {
140 for s in unsafe { self.columns_mut_retain_schema() } {
141 if let Column::Series(s) = s {
142 unsafe { s.chunks_mut().reserve(additional) }
145 }
146 }
147 }
148 pub fn new_from_index(&self, index: usize, height: usize) -> Self {
149 let new_cols = self.apply_columns(|c| c.new_from_index(index, height));
150
151 unsafe { Self::_new_unchecked_impl(height, new_cols).with_schema_from(self) }
152 }
153
154 pub fn full_null(schema: &Schema, height: usize) -> Self {
156 let columns = schema
157 .iter_fields()
158 .map(|f| Column::full_null(f.name().clone(), height, f.dtype()))
159 .collect();
160
161 unsafe { DataFrame::_new_unchecked_impl(height, columns) }
162 }
163
164 pub fn ensure_matches_schema(&mut self, schema: &Schema) -> PolarsResult<()> {
167 let mut did_cast = false;
168 let cached_schema = self.cached_schema().cloned();
169
170 for (col, (name, dt)) in unsafe { self.columns_mut() }.iter_mut().zip(schema.iter()) {
171 polars_ensure!(
172 col.name() == name,
173 SchemaMismatch: "column name mismatch: expected {:?}, found {:?}",
174 name,
175 col.name()
176 );
177
178 let needs_cast = col.dtype().matches_schema_type(dt)?;
179
180 if needs_cast {
181 *col = col.cast(dt)?;
182 did_cast = true;
183 }
184 }
185
186 if !did_cast {
187 unsafe { self.set_opt_schema(cached_schema) };
188 }
189
190 Ok(())
191 }
192
193 pub fn with_row_index(&self, name: PlSmallStr, offset: Option<IdxSize>) -> PolarsResult<Self> {
228 let mut new_columns = Vec::with_capacity(self.width() + 1);
229 let offset = offset.unwrap_or(0);
230
231 if self.get_column_index(&name).is_some() {
232 polars_bail!(duplicate = name)
233 }
234
235 let col = Column::new_row_index(name, offset, self.height())?;
236 new_columns.push(col);
237 new_columns.extend_from_slice(self.columns());
238
239 Ok(unsafe { DataFrame::new_unchecked(self.height(), new_columns) })
240 }
241
242 pub unsafe fn with_row_index_mut(
250 &mut self,
251 name: PlSmallStr,
252 offset: Option<IdxSize>,
253 ) -> &mut Self {
254 debug_assert!(
255 self.get_column_index(&name).is_none(),
256 "with_row_index_mut(): column with name {} already exists",
257 &name
258 );
259
260 let offset = offset.unwrap_or(0);
261 let col = Column::new_row_index(name, offset, self.height()).unwrap();
262
263 unsafe { self.columns_mut() }.insert(0, col);
264 self
265 }
266
267 pub fn shrink_to_fit(&mut self) {
269 for s in unsafe { self.columns_mut_retain_schema() } {
271 s.shrink_to_fit();
272 }
273 }
274
275 pub fn rechunk_mut_par(&mut self) -> &mut Self {
278 if self.columns().iter().any(|c| c.n_chunks() > 1) {
279 POOL.install(|| {
280 unsafe { self.columns_mut_retain_schema() }
281 .par_iter_mut()
282 .for_each(|c| *c = c.rechunk());
283 })
284 }
285
286 self
287 }
288
289 pub fn rechunk_mut(&mut self) -> &mut Self {
291 let columns = unsafe { self.columns_mut() };
293
294 for col in columns.iter_mut().filter(|c| c.n_chunks() > 1) {
295 *col = col.rechunk();
296 }
297
298 self
299 }
300
301 pub fn should_rechunk(&self) -> bool {
303 if !self
306 .columns()
307 .iter()
308 .filter_map(|c| c.as_series().map(|s| s.n_chunks()))
309 .all_equal()
310 {
311 return true;
312 }
313
314 let mut chunk_lengths = self.materialized_column_iter().map(|s| s.chunk_lengths());
316 match chunk_lengths.next() {
317 None => false,
318 Some(first_column_chunk_lengths) => {
319 if first_column_chunk_lengths.size_hint().0 == 1 {
321 return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
322 }
323 let height = self.height();
326 let n_chunks = first_column_chunk_lengths.size_hint().0;
327 if n_chunks > height && !(height == 0 && n_chunks == 1) {
328 return true;
329 }
330 let v: Vec<_> = first_column_chunk_lengths.collect();
332 for cl in chunk_lengths {
333 if cl.enumerate().any(|(idx, el)| Some(&el) != v.get(idx)) {
334 return true;
335 }
336 }
337 false
338 },
339 }
340 }
341
342 pub fn align_chunks_par(&mut self) -> &mut Self {
344 if self.should_rechunk() {
345 self.rechunk_mut_par()
346 } else {
347 self
348 }
349 }
350
351 pub fn align_chunks(&mut self) -> &mut Self {
353 if self.should_rechunk() {
354 self.rechunk_mut()
355 } else {
356 self
357 }
358 }
359
360 pub fn get_column_names(&self) -> Vec<&PlSmallStr> {
371 self.columns().iter().map(|s| s.name()).collect()
372 }
373
374 pub fn get_column_names_owned(&self) -> Vec<PlSmallStr> {
376 self.columns().iter().map(|s| s.name().clone()).collect()
377 }
378
379 pub fn set_column_names<T>(&mut self, new_names: &[T]) -> PolarsResult<()>
391 where
392 T: AsRef<str>,
393 {
394 polars_ensure!(
395 new_names.len() == self.width(),
396 ShapeMismatch: "{} column names provided for a DataFrame of width {}",
397 new_names.len(), self.width()
398 );
399
400 validation::ensure_names_unique(new_names)?;
401
402 *unsafe { self.columns_mut() } = std::mem::take(unsafe { self.columns_mut() })
403 .into_iter()
404 .zip(new_names)
405 .map(|(c, name)| c.with_name(PlSmallStr::from_str(name.as_ref())))
406 .collect();
407
408 Ok(())
409 }
410
411 pub fn dtypes(&self) -> Vec<DataType> {
424 self.columns().iter().map(|s| s.dtype().clone()).collect()
425 }
426
427 pub fn first_col_n_chunks(&self) -> usize {
429 match self.columns().iter().find_map(|col| col.as_series()) {
430 None if self.width() == 0 => 0,
431 None => 1,
432 Some(s) => s.n_chunks(),
433 }
434 }
435
436 pub fn max_n_chunks(&self) -> usize {
438 self.columns()
439 .iter()
440 .map(|s| s.as_series().map(|s| s.n_chunks()).unwrap_or(1))
441 .max()
442 .unwrap_or(0)
443 }
444
445 pub fn fields(&self) -> Vec<Field> {
461 self.columns()
462 .iter()
463 .map(|s| s.field().into_owned())
464 .collect()
465 }
466
467 pub fn hstack(&self, columns: &[Column]) -> PolarsResult<Self> {
501 let mut new_cols = Vec::with_capacity(self.width() + columns.len());
502
503 new_cols.extend(self.columns().iter().cloned());
504 new_cols.extend_from_slice(columns);
505
506 DataFrame::new(self.height(), new_cols)
507 }
508 pub fn vstack(&self, other: &DataFrame) -> PolarsResult<Self> {
549 let mut df = self.clone();
550 df.vstack_mut(other)?;
551 Ok(df)
552 }
553
554 pub fn vstack_mut(&mut self, other: &DataFrame) -> PolarsResult<&mut Self> {
595 if self.width() != other.width() {
596 polars_ensure!(
597 self.shape() == (0, 0),
598 ShapeMismatch:
599 "unable to append to a DataFrame of shape {:?} with a DataFrame of width {}",
600 self.shape(), other.width(),
601 );
602
603 self.clone_from(other);
604
605 return Ok(self);
606 }
607
608 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
609
610 unsafe { self.columns_mut_retain_schema() }
611 .iter_mut()
612 .zip(other.columns())
613 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
614 ensure_can_extend(&*left, right)?;
615 left.append(right).map_err(|e| {
616 e.context(format!("failed to vstack column '{}'", right.name()).into())
617 })?;
618 Ok(())
619 })?;
620
621 unsafe { self.set_height(new_height) };
622
623 Ok(self)
624 }
625
626 pub fn vstack_mut_owned(&mut self, other: DataFrame) -> PolarsResult<&mut Self> {
627 if self.width() != other.width() {
628 polars_ensure!(
629 self.shape() == (0, 0),
630 ShapeMismatch:
631 "unable to append to a DataFrame of width {} with a DataFrame of width {}",
632 self.width(), other.width(),
633 );
634
635 *self = other;
636
637 return Ok(self);
638 }
639
640 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
641
642 unsafe { self.columns_mut_retain_schema() }
643 .iter_mut()
644 .zip(other.into_columns())
645 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
646 ensure_can_extend(&*left, &right)?;
647 let right_name = right.name().clone();
648 left.append_owned(right).map_err(|e| {
649 e.context(format!("failed to vstack column '{right_name}'").into())
650 })?;
651 Ok(())
652 })?;
653
654 unsafe { self.set_height(new_height) };
655
656 Ok(self)
657 }
658
659 pub fn vstack_mut_unchecked(&mut self, other: &DataFrame) -> &mut Self {
666 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
667
668 unsafe { self.columns_mut_retain_schema() }
669 .iter_mut()
670 .zip(other.columns())
671 .for_each(|(left, right)| {
672 left.append(right)
673 .map_err(|e| {
674 e.context(format!("failed to vstack column '{}'", right.name()).into())
675 })
676 .expect("should not fail");
677 });
678
679 unsafe { self.set_height(new_height) };
680
681 self
682 }
683
684 pub fn vstack_mut_owned_unchecked(&mut self, other: DataFrame) -> &mut Self {
691 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
692
693 unsafe { self.columns_mut_retain_schema() }
694 .iter_mut()
695 .zip(other.into_columns())
696 .for_each(|(left, right)| {
697 left.append_owned(right).expect("should not fail");
698 });
699
700 unsafe { self.set_height(new_height) };
701
702 self
703 }
704
705 pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
720 polars_ensure!(
721 self.width() == other.width(),
722 ShapeMismatch:
723 "unable to extend a DataFrame of width {} with a DataFrame of width {}",
724 self.width(), other.width(),
725 );
726
727 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
728
729 unsafe { self.columns_mut_retain_schema() }
730 .iter_mut()
731 .zip(other.columns())
732 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
733 ensure_can_extend(&*left, right)?;
734 left.extend(right).map_err(|e| {
735 e.context(format!("failed to extend column '{}'", right.name()).into())
736 })?;
737 Ok(())
738 })?;
739
740 unsafe { self.set_height(new_height) };
741
742 Ok(())
743 }
744
745 pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
762 let idx = self.try_get_column_index(name)?;
763 Ok(unsafe { self.columns_mut() }.remove(idx))
764 }
765
766 pub fn drop_nulls<S>(&self, subset: Option<&[S]>) -> PolarsResult<Self>
795 where
796 for<'a> &'a S: AsRef<str>,
797 {
798 if let Some(v) = subset {
799 let v = self.select_to_vec(v)?;
800 self._drop_nulls_impl(v.as_slice())
801 } else {
802 self._drop_nulls_impl(self.columns())
803 }
804 }
805
806 fn _drop_nulls_impl(&self, subset: &[Column]) -> PolarsResult<Self> {
807 if subset.iter().all(|s| !s.has_nulls()) {
809 return Ok(self.clone());
810 }
811
812 let mut iter = subset.iter();
813
814 let mask = iter
815 .next()
816 .ok_or_else(|| polars_err!(NoData: "no data to drop nulls from"))?;
817 let mut mask = mask.is_not_null();
818
819 for c in iter {
820 mask = mask & c.is_not_null();
821 }
822 self.filter(&mask)
823 }
824
825 pub fn drop(&self, name: &str) -> PolarsResult<Self> {
840 let idx = self.try_get_column_index(name)?;
841 let mut new_cols = Vec::with_capacity(self.width() - 1);
842
843 self.columns().iter().enumerate().for_each(|(i, s)| {
844 if i != idx {
845 new_cols.push(s.clone())
846 }
847 });
848
849 Ok(unsafe { DataFrame::_new_unchecked_impl(self.height(), new_cols) })
850 }
851
852 pub fn drop_many<I, S>(&self, names: I) -> Self
854 where
855 I: IntoIterator<Item = S>,
856 S: Into<PlSmallStr>,
857 {
858 let names: PlHashSet<PlSmallStr> = names.into_iter().map(|s| s.into()).collect();
859 self.drop_many_amortized(&names)
860 }
861
862 pub fn drop_many_amortized(&self, names: &PlHashSet<PlSmallStr>) -> DataFrame {
864 if names.is_empty() {
865 return self.clone();
866 }
867 let mut new_cols = Vec::with_capacity(self.width().saturating_sub(names.len()));
868 self.columns().iter().for_each(|s| {
869 if !names.contains(s.name()) {
870 new_cols.push(s.clone())
871 }
872 });
873
874 unsafe { DataFrame::new_unchecked(self.height(), new_cols) }
875 }
876
877 fn insert_column_no_namecheck(
880 &mut self,
881 index: usize,
882 column: Column,
883 ) -> PolarsResult<&mut Self> {
884 if self.shape() == (0, 0) {
885 unsafe { self.set_height(column.len()) };
886 }
887
888 polars_ensure!(
889 column.len() == self.height(),
890 ShapeMismatch:
891 "unable to add a column of length {} to a DataFrame of height {}",
892 column.len(), self.height(),
893 );
894
895 unsafe { self.columns_mut() }.insert(index, column);
896 Ok(self)
897 }
898
899 pub fn insert_column(&mut self, index: usize, column: Column) -> PolarsResult<&mut Self> {
901 let name = column.name();
902
903 polars_ensure!(
904 self.get_column_index(name).is_none(),
905 Duplicate:
906 "column with name {:?} is already present in the DataFrame", name
907 );
908
909 self.insert_column_no_namecheck(index, column)
910 }
911
912 pub fn with_column(&mut self, mut column: Column) -> PolarsResult<&mut Self> {
915 if self.shape() == (0, 0) {
916 unsafe { self.set_height(column.len()) };
917 }
918
919 if column.len() != self.height() && column.len() == 1 {
920 column = column.new_from_index(0, self.height());
921 }
922
923 polars_ensure!(
924 column.len() == self.height(),
925 ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
926 column.len(), self.height(),
927 );
928
929 if let Some(i) = self.get_column_index(column.name()) {
930 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
931 } else {
932 unsafe { self.columns_mut() }.push(column)
933 };
934
935 Ok(self)
936 }
937
938 pub unsafe fn push_column_unchecked(&mut self, column: Column) -> &mut Self {
944 unsafe { self.columns_mut() }.push(column);
945 self
946 }
947
948 pub fn with_columns_mut(
951 &mut self,
952 columns: impl IntoIterator<Item = Column>,
953 output_schema: &Schema,
954 ) -> PolarsResult<()> {
955 let columns = columns.into_iter();
956
957 unsafe {
958 self.columns_mut_retain_schema()
959 .reserve(columns.size_hint().0)
960 }
961
962 for c in columns {
963 self.with_column_and_schema_mut(c, output_schema)?;
964 }
965
966 Ok(())
967 }
968
969 fn with_column_and_schema_mut(
970 &mut self,
971 mut column: Column,
972 output_schema: &Schema,
973 ) -> PolarsResult<&mut Self> {
974 if self.shape() == (0, 0) {
975 unsafe { self.set_height(column.len()) };
976 }
977
978 if column.len() != self.height() && column.len() == 1 {
979 column = column.new_from_index(0, self.height());
980 }
981
982 polars_ensure!(
983 column.len() == self.height(),
984 ShapeMismatch:
985 "unable to add a column of length {} to a DataFrame of height {}",
986 column.len(), self.height(),
987 );
988
989 let i = output_schema
990 .index_of(column.name())
991 .or_else(|| self.get_column_index(column.name()))
992 .unwrap_or(self.width());
993
994 if i < self.width() {
995 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
996 } else if i == self.width() {
997 unsafe { self.columns_mut() }.push(column)
998 } else {
999 panic!()
1001 }
1002
1003 Ok(self)
1004 }
1005
1006 pub fn get(&self, idx: usize) -> Option<Vec<AnyValue<'_>>> {
1017 (idx < self.height()).then(|| self.columns().iter().map(|c| c.get(idx).unwrap()).collect())
1018 }
1019
1020 pub fn select_at_idx(&self, idx: usize) -> Option<&Column> {
1036 self.columns().get(idx)
1037 }
1038
1039 pub fn get_column_index(&self, name: &str) -> Option<usize> {
1057 if let Some(schema) = self.cached_schema() {
1058 schema.index_of(name)
1059 } else if self.width() <= LINEAR_SEARCH_LIMIT {
1060 self.columns().iter().position(|s| s.name() == name)
1061 } else {
1062 self.schema().index_of(name)
1063 }
1064 }
1065
1066 pub fn try_get_column_index(&self, name: &str) -> PolarsResult<usize> {
1068 self.get_column_index(name)
1069 .ok_or_else(|| polars_err!(col_not_found = name))
1070 }
1071
1072 pub fn column(&self, name: &str) -> PolarsResult<&Column> {
1086 let idx = self.try_get_column_index(name)?;
1087 Ok(self.select_at_idx(idx).unwrap())
1088 }
1089
1090 pub fn select<I, S>(&self, names: I) -> PolarsResult<Self>
1101 where
1102 I: IntoIterator<Item = S>,
1103 S: AsRef<str>,
1104 {
1105 DataFrame::new(self.height(), self.select_to_vec(names)?)
1106 }
1107
1108 pub unsafe fn select_unchecked<I, S>(&self, names: I) -> PolarsResult<Self>
1113 where
1114 I: IntoIterator<Item = S>,
1115 S: AsRef<str>,
1116 {
1117 Ok(unsafe { DataFrame::new_unchecked(self.height(), self.select_to_vec(names)?) })
1118 }
1119
1120 pub fn select_to_vec(
1138 &self,
1139 selection: impl IntoIterator<Item = impl AsRef<str>>,
1140 ) -> PolarsResult<Vec<Column>> {
1141 AmortizedColumnSelector::new(self).select_multiple(selection)
1142 }
1143
1144 pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1156 if self.width() == 0 {
1157 filter_zero_width(self.height(), mask)
1158 } else if mask.len() == 1 && self.len() >= 1 {
1159 if mask.all() && mask.null_count() == 0 {
1160 Ok(self.clone())
1161 } else {
1162 Ok(self.clear())
1163 }
1164 } else {
1165 let new_columns: Vec<Column> = self.try_apply_columns_par(|s| s.filter(mask))?;
1166 let out = unsafe {
1167 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1168 };
1169
1170 Ok(out)
1171 }
1172 }
1173
1174 pub fn filter_seq(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1176 if self.width() == 0 {
1177 filter_zero_width(self.height(), mask)
1178 } else if mask.len() == 1 && mask.null_count() == 0 && self.len() >= 1 {
1179 if mask.all() && mask.null_count() == 0 {
1180 Ok(self.clone())
1181 } else {
1182 Ok(self.clear())
1183 }
1184 } else {
1185 let new_columns: Vec<Column> = self.try_apply_columns(|s| s.filter(mask))?;
1186 let out = unsafe {
1187 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1188 };
1189
1190 Ok(out)
1191 }
1192 }
1193
1194 pub fn take(&self, indices: &IdxCa) -> PolarsResult<Self> {
1206 check_bounds_ca(indices, self.height().try_into().unwrap_or(IdxSize::MAX))?;
1207
1208 let new_cols = self.apply_columns_par(|c| {
1209 assert_eq!(c.len(), self.height());
1210 unsafe { c.take_unchecked(indices) }
1211 });
1212
1213 Ok(unsafe { DataFrame::new_unchecked(indices.len(), new_cols).with_schema_from(self) })
1214 }
1215
1216 pub unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
1219 self.take_unchecked_impl(idx, true)
1220 }
1221
1222 #[cfg(feature = "algorithm_group_by")]
1225 pub unsafe fn gather_group_unchecked(&self, group: &GroupsIndicator) -> Self {
1226 match group {
1227 GroupsIndicator::Idx((_, indices)) => unsafe {
1228 self.take_slice_unchecked_impl(indices.as_slice(), false)
1229 },
1230 GroupsIndicator::Slice([offset, len]) => self.slice(*offset as i64, *len as usize),
1231 }
1232 }
1233
1234 pub unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
1237 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1238 POOL.install(|| {
1239 if POOL.current_num_threads() > self.width() {
1240 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1241 if self.height() / stride >= 2 {
1242 self.apply_columns_par(|c| {
1243 let c = if c.dtype().is_nested() {
1246 &c.rechunk()
1247 } else {
1248 c
1249 };
1250
1251 (0..idx.len().div_ceil(stride))
1252 .into_par_iter()
1253 .map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
1254 .reduce(
1255 || Column::new_empty(c.name().clone(), c.dtype()),
1256 |mut a, b| {
1257 a.append_owned(b).unwrap();
1258 a
1259 },
1260 )
1261 })
1262 } else {
1263 self.apply_columns_par(|c| c.take_unchecked(idx))
1264 }
1265 } else {
1266 self.apply_columns_par(|c| c.take_unchecked(idx))
1267 }
1268 })
1269 } else {
1270 self.apply_columns(|s| s.take_unchecked(idx))
1271 };
1272
1273 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1274 }
1275
1276 pub unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self {
1279 self.take_slice_unchecked_impl(idx, true)
1280 }
1281
1282 pub unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
1285 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1286 POOL.install(|| {
1287 if POOL.current_num_threads() > self.width() {
1288 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1289 if self.height() / stride >= 2 {
1290 self.apply_columns_par(|c| {
1291 let c = if c.dtype().is_nested() {
1294 &c.rechunk()
1295 } else {
1296 c
1297 };
1298
1299 (0..idx.len().div_ceil(stride))
1300 .into_par_iter()
1301 .map(|i| {
1302 let idx = &idx[i * stride..];
1303 let idx = &idx[..idx.len().min(stride)];
1304 c.take_slice_unchecked(idx)
1305 })
1306 .reduce(
1307 || Column::new_empty(c.name().clone(), c.dtype()),
1308 |mut a, b| {
1309 a.append_owned(b).unwrap();
1310 a
1311 },
1312 )
1313 })
1314 } else {
1315 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1316 }
1317 } else {
1318 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1319 }
1320 })
1321 } else {
1322 self.apply_columns(|s| s.take_slice_unchecked(idx))
1323 };
1324 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1325 }
1326
1327 pub fn rename(&mut self, column: &str, name: PlSmallStr) -> PolarsResult<&mut Self> {
1342 if column == name.as_str() {
1343 return Ok(self);
1344 }
1345 polars_ensure!(
1346 !self.schema().contains(&name),
1347 Duplicate: "column rename attempted with already existing name \"{name}\""
1348 );
1349
1350 self.get_column_index(column)
1351 .and_then(|idx| unsafe { self.columns_mut() }.get_mut(idx))
1352 .ok_or_else(|| polars_err!(col_not_found = column))
1353 .map(|c| c.rename(name))?;
1354
1355 Ok(self)
1356 }
1357
1358 pub fn rename_many<'a>(
1359 &mut self,
1360 renames: impl Iterator<Item = (&'a str, PlSmallStr)>,
1361 ) -> PolarsResult<&mut Self> {
1362 let mut schema_arc = self.schema().clone();
1363 let schema = Arc::make_mut(&mut schema_arc);
1364
1365 for (from, to) in renames {
1366 if from == to.as_str() {
1367 continue;
1368 }
1369
1370 polars_ensure!(
1371 !schema.contains(&to),
1372 Duplicate: "column rename attempted with already existing name \"{to}\""
1373 );
1374
1375 match schema.get_full(from) {
1376 None => polars_bail!(col_not_found = from),
1377 Some((idx, _, _)) => {
1378 let (n, _) = schema.get_at_index_mut(idx).unwrap();
1379 *n = to.clone();
1380 unsafe { self.columns_mut() }
1381 .get_mut(idx)
1382 .unwrap()
1383 .rename(to);
1384 },
1385 }
1386 }
1387
1388 unsafe { self.set_schema(schema_arc) };
1389
1390 Ok(self)
1391 }
1392
1393 pub fn sort_in_place(
1397 &mut self,
1398 by: impl IntoIterator<Item = impl AsRef<str>>,
1399 sort_options: SortMultipleOptions,
1400 ) -> PolarsResult<&mut Self> {
1401 let by_column = self.select_to_vec(by)?;
1402
1403 let mut out = self.sort_impl(by_column, sort_options, None)?;
1404 unsafe { out.set_schema_from(self) };
1405
1406 *self = out;
1407
1408 Ok(self)
1409 }
1410
1411 #[doc(hidden)]
1412 pub fn sort_impl(
1414 &self,
1415 by_column: Vec<Column>,
1416 sort_options: SortMultipleOptions,
1417 slice: Option<(i64, usize)>,
1418 ) -> PolarsResult<Self> {
1419 if by_column.is_empty() {
1420 return if let Some((offset, len)) = slice {
1422 Ok(self.slice(offset, len))
1423 } else {
1424 Ok(self.clone())
1425 };
1426 }
1427
1428 for column in &by_column {
1429 if column.dtype().is_object() {
1430 polars_bail!(
1431 InvalidOperation: "column '{}' has a dtype of '{}', which does not support sorting", column.name(), column.dtype()
1432 )
1433 }
1434 }
1435
1436 let first_descending = sort_options.descending[0];
1441 let first_by_column = by_column[0].name().to_string();
1442
1443 let set_sorted = |df: &mut DataFrame| {
1444 let _ = df.apply(&first_by_column, |s| {
1447 let mut s = s.clone();
1448 if first_descending {
1449 s.set_sorted_flag(IsSorted::Descending)
1450 } else {
1451 s.set_sorted_flag(IsSorted::Ascending)
1452 }
1453 s
1454 });
1455 };
1456
1457 if self.shape_has_zero() {
1458 let mut out = self.clone();
1459 set_sorted(&mut out);
1460 return Ok(out);
1461 }
1462
1463 if let Some((0, k)) = slice {
1464 if k < self.height() {
1465 return self.bottom_k_impl(k, by_column, sort_options);
1466 }
1467 }
1468 #[cfg(feature = "dtype-categorical")]
1472 let is_not_categorical_enum =
1473 !(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
1474 || matches!(by_column[0].dtype(), DataType::Enum(_, _)));
1475
1476 #[cfg(not(feature = "dtype-categorical"))]
1477 #[allow(non_upper_case_globals)]
1478 const is_not_categorical_enum: bool = true;
1479
1480 if by_column.len() == 1 && is_not_categorical_enum {
1481 let required_sorting = if sort_options.descending[0] {
1482 IsSorted::Descending
1483 } else {
1484 IsSorted::Ascending
1485 };
1486 let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
1489 && ((by_column[0].null_count() == 0)
1490 || by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
1491 == sort_options.nulls_last[0]);
1492
1493 if no_sorting_required {
1494 return if let Some((offset, len)) = slice {
1495 Ok(self.slice(offset, len))
1496 } else {
1497 Ok(self.clone())
1498 };
1499 }
1500 }
1501
1502 let has_nested = by_column.iter().any(|s| s.dtype().is_nested());
1503 let allow_threads = sort_options.multithreaded;
1504
1505 let mut df = self.clone();
1507 let df = df.rechunk_mut_par();
1508 let mut take = match (by_column.len(), has_nested) {
1509 (1, false) => {
1510 let s = &by_column[0];
1511 let options = SortOptions {
1512 descending: sort_options.descending[0],
1513 nulls_last: sort_options.nulls_last[0],
1514 multithreaded: sort_options.multithreaded,
1515 maintain_order: sort_options.maintain_order,
1516 limit: sort_options.limit,
1517 };
1518 if df.width() == 1 && df.try_get_column_index(s.name().as_str()).is_ok() {
1522 let mut out = s.sort_with(options)?;
1523 if let Some((offset, len)) = slice {
1524 out = out.slice(offset, len);
1525 }
1526 return Ok(out.into_frame());
1527 }
1528 s.arg_sort(options)
1529 },
1530 _ => arg_sort(&by_column, sort_options)?,
1531 };
1532
1533 if let Some((offset, len)) = slice {
1534 take = take.slice(offset, len);
1535 }
1536
1537 let mut df = unsafe { df.take_unchecked_impl(&take, allow_threads) };
1540 set_sorted(&mut df);
1541 Ok(df)
1542 }
1543
1544 pub fn _to_metadata(&self) -> DataFrame {
1549 let num_columns = self.width();
1550
1551 let mut column_names =
1552 StringChunkedBuilder::new(PlSmallStr::from_static("column_name"), num_columns);
1553 let mut repr_ca = StringChunkedBuilder::new(PlSmallStr::from_static("repr"), num_columns);
1554 let mut sorted_asc_ca =
1555 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_asc"), num_columns);
1556 let mut sorted_dsc_ca =
1557 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_dsc"), num_columns);
1558 let mut fast_explode_list_ca =
1559 BooleanChunkedBuilder::new(PlSmallStr::from_static("fast_explode_list"), num_columns);
1560 let mut materialized_at_ca =
1561 StringChunkedBuilder::new(PlSmallStr::from_static("materialized_at"), num_columns);
1562
1563 for col in self.columns() {
1564 let flags = col.get_flags();
1565
1566 let (repr, materialized_at) = match col {
1567 Column::Series(s) => ("series", s.materialized_at()),
1568 Column::Scalar(_) => ("scalar", None),
1569 };
1570 let sorted_asc = flags.contains(StatisticsFlags::IS_SORTED_ASC);
1571 let sorted_dsc = flags.contains(StatisticsFlags::IS_SORTED_DSC);
1572 let fast_explode_list = flags.contains(StatisticsFlags::CAN_FAST_EXPLODE_LIST);
1573
1574 column_names.append_value(col.name().clone());
1575 repr_ca.append_value(repr);
1576 sorted_asc_ca.append_value(sorted_asc);
1577 sorted_dsc_ca.append_value(sorted_dsc);
1578 fast_explode_list_ca.append_value(fast_explode_list);
1579 materialized_at_ca.append_option(materialized_at.map(|v| format!("{v:#?}")));
1580 }
1581
1582 unsafe {
1583 DataFrame::new_unchecked(
1584 self.width(),
1585 vec![
1586 column_names.finish().into_column(),
1587 repr_ca.finish().into_column(),
1588 sorted_asc_ca.finish().into_column(),
1589 sorted_dsc_ca.finish().into_column(),
1590 fast_explode_list_ca.finish().into_column(),
1591 materialized_at_ca.finish().into_column(),
1592 ],
1593 )
1594 }
1595 }
1596 pub fn sort(
1634 &self,
1635 by: impl IntoIterator<Item = impl AsRef<str>>,
1636 sort_options: SortMultipleOptions,
1637 ) -> PolarsResult<Self> {
1638 let mut df = self.clone();
1639 df.sort_in_place(by, sort_options)?;
1640 Ok(df)
1641 }
1642
1643 pub fn replace(&mut self, column: &str, new_col: Column) -> PolarsResult<&mut Self> {
1658 self.apply(column, |_| new_col)
1659 }
1660
1661 pub fn replace_column(&mut self, index: usize, new_column: Column) -> PolarsResult<&mut Self> {
1676 polars_ensure!(
1677 index < self.width(),
1678 ShapeMismatch:
1679 "unable to replace at index {}, the DataFrame has only {} columns",
1680 index, self.width(),
1681 );
1682
1683 polars_ensure!(
1684 new_column.len() == self.height(),
1685 ShapeMismatch:
1686 "unable to replace a column, series length {} doesn't match the DataFrame height {}",
1687 new_column.len(), self.height(),
1688 );
1689
1690 unsafe { *self.columns_mut().get_mut(index).unwrap() = new_column };
1691
1692 Ok(self)
1693 }
1694
1695 pub fn apply<F, C>(&mut self, name: &str, f: F) -> PolarsResult<&mut Self>
1736 where
1737 F: FnOnce(&Column) -> C,
1738 C: IntoColumn,
1739 {
1740 let idx = self.try_get_column_index(name)?;
1741 self.apply_at_idx(idx, f)?;
1742 Ok(self)
1743 }
1744
1745 pub fn apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1776 where
1777 F: FnOnce(&Column) -> C,
1778 C: IntoColumn,
1779 {
1780 let df_height = self.height();
1781 let width = self.width();
1782
1783 let cached_schema = self.cached_schema().cloned();
1784
1785 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1786 polars_err!(
1787 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1788 idx, width
1789 )
1790 })?;
1791
1792 let mut new_col = f(col).into_column();
1793
1794 if new_col.len() != df_height && new_col.len() == 1 {
1795 new_col = new_col.new_from_index(0, df_height);
1796 }
1797
1798 polars_ensure!(
1799 new_col.len() == df_height,
1800 ShapeMismatch:
1801 "apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1802 new_col.len(), df_height
1803 );
1804
1805 new_col = new_col.with_name(col.name().clone());
1806 let col_before = std::mem::replace(col, new_col);
1807
1808 if col.dtype() == col_before.dtype() {
1809 unsafe { self.set_opt_schema(cached_schema) };
1810 }
1811
1812 Ok(self)
1813 }
1814
1815 pub fn try_apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1856 where
1857 F: FnOnce(&Column) -> PolarsResult<C>,
1858 C: IntoColumn,
1859 {
1860 let df_height = self.height();
1861 let width = self.width();
1862
1863 let cached_schema = self.cached_schema().cloned();
1864
1865 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1866 polars_err!(
1867 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1868 idx, width
1869 )
1870 })?;
1871
1872 let mut new_col = f(col).map(|c| c.into_column())?;
1873
1874 polars_ensure!(
1875 new_col.len() == df_height,
1876 ShapeMismatch:
1877 "try_apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1878 new_col.len(), df_height
1879 );
1880
1881 new_col = new_col.with_name(col.name().clone());
1883 let col_before = std::mem::replace(col, new_col);
1884
1885 if col.dtype() == col_before.dtype() {
1886 unsafe { self.set_opt_schema(cached_schema) };
1887 }
1888
1889 Ok(self)
1890 }
1891
1892 pub fn try_apply<F, C>(&mut self, column: &str, f: F) -> PolarsResult<&mut Self>
1935 where
1936 F: FnOnce(&Series) -> PolarsResult<C>,
1937 C: IntoColumn,
1938 {
1939 let idx = self.try_get_column_index(column)?;
1940 self.try_apply_at_idx(idx, |c| f(c.as_materialized_series()))
1941 }
1942
1943 #[must_use]
1973 pub fn slice(&self, offset: i64, length: usize) -> Self {
1974 if offset == 0 && length == self.height() {
1975 return self.clone();
1976 }
1977
1978 if length == 0 {
1979 return self.clear();
1980 }
1981
1982 let cols = self.apply_columns(|s| s.slice(offset, length));
1983
1984 let height = if let Some(fst) = cols.first() {
1985 fst.len()
1986 } else {
1987 let (_, length) = slice_offsets(offset, length, self.height());
1988 length
1989 };
1990
1991 unsafe { DataFrame::_new_unchecked_impl(height, cols).with_schema_from(self) }
1992 }
1993
1994 pub fn split_at(&self, offset: i64) -> (Self, Self) {
1996 let (a, b) = self.columns().iter().map(|s| s.split_at(offset)).unzip();
1997
1998 let (idx, _) = slice_offsets(offset, 0, self.height());
1999
2000 let a = unsafe { DataFrame::new_unchecked(idx, a).with_schema_from(self) };
2001 let b = unsafe { DataFrame::new_unchecked(self.height() - idx, b).with_schema_from(self) };
2002 (a, b)
2003 }
2004
2005 #[must_use]
2006 pub fn clear(&self) -> Self {
2007 let cols = self.columns().iter().map(|s| s.clear()).collect::<Vec<_>>();
2008 unsafe { DataFrame::_new_unchecked_impl(0, cols).with_schema_from(self) }
2009 }
2010
2011 #[must_use]
2012 pub fn slice_par(&self, offset: i64, length: usize) -> Self {
2013 if offset == 0 && length == self.height() {
2014 return self.clone();
2015 }
2016 let columns = self.apply_columns_par(|s| s.slice(offset, length));
2017 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2018 }
2019
2020 #[must_use]
2021 pub fn _slice_and_realloc(&self, offset: i64, length: usize) -> Self {
2022 if offset == 0 && length == self.height() {
2023 return self.clone();
2024 }
2025 let columns = self.apply_columns(|s| {
2027 let mut out = s.slice(offset, length);
2028 out.shrink_to_fit();
2029 out
2030 });
2031 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2032 }
2033
2034 #[must_use]
2068 pub fn head(&self, length: Option<usize>) -> Self {
2069 let new_height = usize::min(self.height(), length.unwrap_or(HEAD_DEFAULT_LENGTH));
2070 let new_cols = self.apply_columns(|c| c.head(Some(new_height)));
2071
2072 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2073 }
2074
2075 #[must_use]
2106 pub fn tail(&self, length: Option<usize>) -> Self {
2107 let new_height = usize::min(self.height(), length.unwrap_or(TAIL_DEFAULT_LENGTH));
2108 let new_cols = self.apply_columns(|c| c.tail(Some(new_height)));
2109
2110 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2111 }
2112
2113 pub fn iter_chunks(
2123 &self,
2124 compat_level: CompatLevel,
2125 parallel: bool,
2126 ) -> impl Iterator<Item = RecordBatch> + '_ {
2127 debug_assert!(!self.should_rechunk(), "expected equal chunks");
2128
2129 if self.width() == 0 {
2130 return RecordBatchIterWrap::new_zero_width(self.height());
2131 }
2132
2133 let must_convert = compat_level.0 == 0;
2136 let parallel = parallel
2137 && must_convert
2138 && self.width() > 1
2139 && self
2140 .columns()
2141 .iter()
2142 .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));
2143
2144 RecordBatchIterWrap::Batches(RecordBatchIter {
2145 df: self,
2146 schema: Arc::new(
2147 self.columns()
2148 .iter()
2149 .map(|c| c.field().to_arrow(compat_level))
2150 .collect(),
2151 ),
2152 idx: 0,
2153 n_chunks: usize::max(1, self.first_col_n_chunks()),
2154 compat_level,
2155 parallel,
2156 })
2157 }
2158
2159 pub fn iter_chunks_physical(&self) -> impl Iterator<Item = RecordBatch> + '_ {
2169 debug_assert!(!self.should_rechunk());
2170
2171 if self.width() == 0 {
2172 return RecordBatchIterWrap::new_zero_width(self.height());
2173 }
2174
2175 RecordBatchIterWrap::PhysicalBatches(PhysRecordBatchIter {
2176 schema: Arc::new(
2177 self.columns()
2178 .iter()
2179 .map(|c| c.field().to_arrow(CompatLevel::newest()))
2180 .collect(),
2181 ),
2182 arr_iters: self
2183 .materialized_column_iter()
2184 .map(|s| s.chunks().iter())
2185 .collect(),
2186 })
2187 }
2188
2189 #[must_use]
2191 pub fn reverse(&self) -> Self {
2192 let new_cols = self.apply_columns(Column::reverse);
2193 unsafe { DataFrame::new_unchecked(self.height(), new_cols).with_schema_from(self) }
2194 }
2195
2196 #[must_use]
2201 pub fn shift(&self, periods: i64) -> Self {
2202 let col = self.apply_columns_par(|s| s.shift(periods));
2203 unsafe { DataFrame::new_unchecked(self.height(), col).with_schema_from(self) }
2204 }
2205
2206 pub fn fill_null(&self, strategy: FillNullStrategy) -> PolarsResult<Self> {
2215 let col = self.try_apply_columns_par(|s| s.fill_null(strategy))?;
2216
2217 Ok(unsafe { DataFrame::new_unchecked(self.height(), col) })
2218 }
2219
2220 pub fn pipe<F, B>(self, f: F) -> PolarsResult<B>
2222 where
2223 F: Fn(DataFrame) -> PolarsResult<B>,
2224 {
2225 f(self)
2226 }
2227
2228 pub fn pipe_mut<F, B>(&mut self, f: F) -> PolarsResult<B>
2230 where
2231 F: Fn(&mut DataFrame) -> PolarsResult<B>,
2232 {
2233 f(self)
2234 }
2235
2236 pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> PolarsResult<B>
2238 where
2239 F: Fn(DataFrame, Args) -> PolarsResult<B>,
2240 {
2241 f(self, args)
2242 }
2243 #[cfg(feature = "algorithm_group_by")]
2277 pub fn unique_stable(
2278 &self,
2279 subset: Option<&[String]>,
2280 keep: UniqueKeepStrategy,
2281 slice: Option<(i64, usize)>,
2282 ) -> PolarsResult<DataFrame> {
2283 self.unique_impl(
2284 true,
2285 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2286 keep,
2287 slice,
2288 )
2289 }
2290
2291 #[cfg(feature = "algorithm_group_by")]
2293 pub fn unique<I, S>(
2294 &self,
2295 subset: Option<&[String]>,
2296 keep: UniqueKeepStrategy,
2297 slice: Option<(i64, usize)>,
2298 ) -> PolarsResult<DataFrame> {
2299 self.unique_impl(
2300 false,
2301 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2302 keep,
2303 slice,
2304 )
2305 }
2306
2307 #[cfg(feature = "algorithm_group_by")]
2308 pub fn unique_impl(
2309 &self,
2310 maintain_order: bool,
2311 subset: Option<Vec<PlSmallStr>>,
2312 keep: UniqueKeepStrategy,
2313 slice: Option<(i64, usize)>,
2314 ) -> PolarsResult<Self> {
2315 if self.width() == 0 {
2316 let height = usize::min(self.height(), 1);
2317 return Ok(DataFrame::empty_with_height(height));
2318 }
2319
2320 let names = subset.unwrap_or_else(|| self.get_column_names_owned());
2321 let mut df = self.clone();
2322 df.rechunk_mut_par();
2324
2325 let columns = match (keep, maintain_order) {
2326 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, true) => {
2327 let gb = df.group_by_stable(names)?;
2328 let groups = gb.get_groups();
2329 let (offset, len) = slice.unwrap_or((0, groups.len()));
2330 let groups = groups.slice(offset, len);
2331 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2332 },
2333 (UniqueKeepStrategy::Last, true) => {
2334 let gb = df.group_by_stable(names)?;
2337 let groups = gb.get_groups();
2338
2339 let last_idx: NoNull<IdxCa> = groups
2340 .iter()
2341 .map(|g| match g {
2342 GroupsIndicator::Idx((_first, idx)) => idx[idx.len() - 1],
2343 GroupsIndicator::Slice([first, len]) => first + len - 1,
2344 })
2345 .collect();
2346
2347 let mut last_idx = last_idx.into_inner().sort(false);
2348
2349 if let Some((offset, len)) = slice {
2350 last_idx = last_idx.slice(offset, len);
2351 }
2352
2353 let last_idx = NoNull::new(last_idx);
2354 let out = unsafe { df.take_unchecked(&last_idx) };
2355 return Ok(out);
2356 },
2357 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, false) => {
2358 let gb = df.group_by(names)?;
2359 let groups = gb.get_groups();
2360 let (offset, len) = slice.unwrap_or((0, groups.len()));
2361 let groups = groups.slice(offset, len);
2362 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2363 },
2364 (UniqueKeepStrategy::Last, false) => {
2365 let gb = df.group_by(names)?;
2366 let groups = gb.get_groups();
2367 let (offset, len) = slice.unwrap_or((0, groups.len()));
2368 let groups = groups.slice(offset, len);
2369 df.apply_columns_par(|s| unsafe { s.agg_last(&groups) })
2370 },
2371 (UniqueKeepStrategy::None, _) => {
2372 let df_part = df.select(names)?;
2373 let mask = df_part.is_unique()?;
2374 let mut filtered = df.filter(&mask)?;
2375
2376 if let Some((offset, len)) = slice {
2377 filtered = filtered.slice(offset, len);
2378 }
2379 return Ok(filtered);
2380 },
2381 };
2382 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns).with_schema_from(self) })
2383 }
2384
2385 #[cfg(feature = "algorithm_group_by")]
2399 pub fn is_unique(&self) -> PolarsResult<BooleanChunked> {
2400 let gb = self.group_by(self.get_column_names_owned())?;
2401 let groups = gb.get_groups();
2402 Ok(is_unique_helper(
2403 groups,
2404 self.height() as IdxSize,
2405 true,
2406 false,
2407 ))
2408 }
2409
2410 #[cfg(feature = "algorithm_group_by")]
2424 pub fn is_duplicated(&self) -> PolarsResult<BooleanChunked> {
2425 let gb = self.group_by(self.get_column_names_owned())?;
2426 let groups = gb.get_groups();
2427 Ok(is_unique_helper(
2428 groups,
2429 self.height() as IdxSize,
2430 false,
2431 true,
2432 ))
2433 }
2434
2435 #[must_use]
2437 pub fn null_count(&self) -> Self {
2438 let cols =
2439 self.apply_columns(|c| Column::new(c.name().clone(), [c.null_count() as IdxSize]));
2440 unsafe { Self::new_unchecked(1, cols) }
2441 }
2442
2443 #[cfg(feature = "row_hash")]
2445 pub fn hash_rows(
2446 &mut self,
2447 hasher_builder: Option<PlSeedableRandomStateQuality>,
2448 ) -> PolarsResult<UInt64Chunked> {
2449 let dfs = split_df(self, POOL.current_num_threads(), false);
2450 let (cas, _) = _df_rows_to_hashes_threaded_vertical(&dfs, hasher_builder)?;
2451
2452 let mut iter = cas.into_iter();
2453 let mut acc_ca = iter.next().unwrap();
2454 for ca in iter {
2455 acc_ca.append(&ca)?;
2456 }
2457 Ok(acc_ca.rechunk().into_owned())
2458 }
2459
2460 pub fn get_supertype(&self) -> Option<PolarsResult<DataType>> {
2462 self.columns()
2463 .iter()
2464 .map(|s| Ok(s.dtype().clone()))
2465 .reduce(|acc, b| try_get_supertype(&acc?, &b.unwrap()))
2466 }
2467
2468 #[doc(hidden)]
2473 pub unsafe fn _take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
2474 self._take_unchecked_slice_sorted(idx, allow_threads, IsSorted::Not)
2475 }
2476
2477 #[doc(hidden)]
2484 pub unsafe fn _take_unchecked_slice_sorted(
2485 &self,
2486 idx: &[IdxSize],
2487 allow_threads: bool,
2488 sorted: IsSorted,
2489 ) -> Self {
2490 #[cfg(debug_assertions)]
2491 {
2492 if idx.len() > 2 {
2493 use crate::series::IsSorted;
2494
2495 match sorted {
2496 IsSorted::Ascending => {
2497 assert!(idx[0] <= idx[idx.len() - 1]);
2498 },
2499 IsSorted::Descending => {
2500 assert!(idx[0] >= idx[idx.len() - 1]);
2501 },
2502 _ => {},
2503 }
2504 }
2505 }
2506 let mut ca = IdxCa::mmap_slice(PlSmallStr::EMPTY, idx);
2507 ca.set_sorted_flag(sorted);
2508 self.take_unchecked_impl(&ca, allow_threads)
2509 }
2510 #[cfg(all(feature = "partition_by", feature = "algorithm_group_by"))]
2511 #[doc(hidden)]
2512 pub fn _partition_by_impl(
2513 &self,
2514 cols: &[PlSmallStr],
2515 stable: bool,
2516 include_key: bool,
2517 parallel: bool,
2518 ) -> PolarsResult<Vec<DataFrame>> {
2519 let selected_keys = self.select_to_vec(cols.iter().cloned())?;
2520 let groups = self.group_by_with_series(selected_keys, parallel, stable)?;
2521 let groups = groups.into_groups();
2522
2523 let df = if include_key {
2525 self.clone()
2526 } else {
2527 self.drop_many(cols.iter().cloned())
2528 };
2529
2530 if parallel {
2531 POOL.install(|| {
2534 match groups.as_ref() {
2535 GroupsType::Idx(idx) => {
2536 let mut df = df.clone();
2538 df.rechunk_mut_par();
2539 Ok(idx
2540 .into_par_iter()
2541 .map(|(_, group)| {
2542 unsafe {
2544 df._take_unchecked_slice_sorted(
2545 group,
2546 false,
2547 IsSorted::Ascending,
2548 )
2549 }
2550 })
2551 .collect())
2552 },
2553 GroupsType::Slice { groups, .. } => Ok(groups
2554 .into_par_iter()
2555 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2556 .collect()),
2557 }
2558 })
2559 } else {
2560 match groups.as_ref() {
2561 GroupsType::Idx(idx) => {
2562 let mut df = df;
2564 df.rechunk_mut();
2565 Ok(idx
2566 .into_iter()
2567 .map(|(_, group)| {
2568 unsafe {
2570 df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
2571 }
2572 })
2573 .collect())
2574 },
2575 GroupsType::Slice { groups, .. } => Ok(groups
2576 .iter()
2577 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2578 .collect()),
2579 }
2580 }
2581 }
2582
2583 #[cfg(feature = "partition_by")]
2585 pub fn partition_by<I, S>(&self, cols: I, include_key: bool) -> PolarsResult<Vec<DataFrame>>
2586 where
2587 I: IntoIterator<Item = S>,
2588 S: Into<PlSmallStr>,
2589 {
2590 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2591 self._partition_by_impl(cols.as_slice(), false, include_key, true)
2592 }
2593
2594 #[cfg(feature = "partition_by")]
2597 pub fn partition_by_stable<I, S>(
2598 &self,
2599 cols: I,
2600 include_key: bool,
2601 ) -> PolarsResult<Vec<DataFrame>>
2602 where
2603 I: IntoIterator<Item = S>,
2604 S: Into<PlSmallStr>,
2605 {
2606 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2607 self._partition_by_impl(cols.as_slice(), true, include_key, true)
2608 }
2609
2610 #[cfg(feature = "dtype-struct")]
2613 pub fn unnest(
2614 &self,
2615 cols: impl IntoIterator<Item = impl Into<PlSmallStr>>,
2616 separator: Option<&str>,
2617 ) -> PolarsResult<DataFrame> {
2618 self.unnest_impl(cols.into_iter().map(Into::into).collect(), separator)
2619 }
2620
2621 #[cfg(feature = "dtype-struct")]
2622 fn unnest_impl(
2623 &self,
2624 cols: PlHashSet<PlSmallStr>,
2625 separator: Option<&str>,
2626 ) -> PolarsResult<DataFrame> {
2627 let mut new_cols = Vec::with_capacity(std::cmp::min(self.width() * 2, self.width() + 128));
2628 let mut count = 0;
2629 for s in self.columns() {
2630 if cols.contains(s.name()) {
2631 let ca = s.struct_()?.clone();
2632 new_cols.extend(ca.fields_as_series().into_iter().map(|mut f| {
2633 if let Some(separator) = &separator {
2634 f.rename(polars_utils::format_pl_smallstr!(
2635 "{}{}{}",
2636 s.name(),
2637 separator,
2638 f.name()
2639 ));
2640 }
2641 Column::from(f)
2642 }));
2643 count += 1;
2644 } else {
2645 new_cols.push(s.clone())
2646 }
2647 }
2648 if count != cols.len() {
2649 let schema = self.schema();
2652 for col in cols {
2653 let _ = schema
2654 .get(col.as_str())
2655 .ok_or_else(|| polars_err!(col_not_found = col))?;
2656 }
2657 }
2658
2659 DataFrame::new_infer_height(new_cols)
2660 }
2661
2662 pub fn append_record_batch(&mut self, rb: RecordBatchT<ArrayRef>) -> PolarsResult<()> {
2663 let df = DataFrame::from(rb);
2666 polars_ensure!(
2667 self.schema() == df.schema(),
2668 SchemaMismatch: "cannot append record batch with different schema\n\n
2669 Got {:?}\nexpected: {:?}", df.schema(), self.schema(),
2670 );
2671 self.vstack_mut_owned_unchecked(df);
2672 Ok(())
2673 }
2674}
2675
2676pub struct RecordBatchIter<'a> {
2677 df: &'a DataFrame,
2678 schema: ArrowSchemaRef,
2679 idx: usize,
2680 n_chunks: usize,
2681 compat_level: CompatLevel,
2682 parallel: bool,
2683}
2684
2685impl Iterator for RecordBatchIter<'_> {
2686 type Item = RecordBatch;
2687
2688 fn next(&mut self) -> Option<Self::Item> {
2689 if self.idx >= self.n_chunks {
2690 return None;
2691 }
2692
2693 let batch_cols: Vec<ArrayRef> = if self.parallel {
2695 let iter = self
2696 .df
2697 .columns()
2698 .par_iter()
2699 .map(Column::as_materialized_series)
2700 .map(|s| s.to_arrow(self.idx, self.compat_level));
2701 POOL.install(|| iter.collect())
2702 } else {
2703 self.df
2704 .columns()
2705 .iter()
2706 .map(Column::as_materialized_series)
2707 .map(|s| s.to_arrow(self.idx, self.compat_level))
2708 .collect()
2709 };
2710
2711 let length = batch_cols.first().map_or(0, |arr| arr.len());
2712
2713 self.idx += 1;
2714
2715 Some(RecordBatch::new(length, self.schema.clone(), batch_cols))
2716 }
2717
2718 fn size_hint(&self) -> (usize, Option<usize>) {
2719 let n = self.n_chunks - self.idx;
2720 (n, Some(n))
2721 }
2722}
2723
2724pub struct PhysRecordBatchIter<'a> {
2725 schema: ArrowSchemaRef,
2726 arr_iters: Vec<std::slice::Iter<'a, ArrayRef>>,
2727}
2728
2729impl Iterator for PhysRecordBatchIter<'_> {
2730 type Item = RecordBatch;
2731
2732 fn next(&mut self) -> Option<Self::Item> {
2733 let arrs = self
2734 .arr_iters
2735 .iter_mut()
2736 .map(|phys_iter| phys_iter.next().cloned())
2737 .collect::<Option<Vec<_>>>()?;
2738
2739 let length = arrs.first().map_or(0, |arr| arr.len());
2740 Some(RecordBatch::new(length, self.schema.clone(), arrs))
2741 }
2742
2743 fn size_hint(&self) -> (usize, Option<usize>) {
2744 if let Some(iter) = self.arr_iters.first() {
2745 iter.size_hint()
2746 } else {
2747 (0, None)
2748 }
2749 }
2750}
2751
2752pub enum RecordBatchIterWrap<'a> {
2753 ZeroWidth {
2754 remaining_height: usize,
2755 chunk_size: usize,
2756 },
2757 Batches(RecordBatchIter<'a>),
2758 PhysicalBatches(PhysRecordBatchIter<'a>),
2759}
2760
2761impl<'a> RecordBatchIterWrap<'a> {
2762 fn new_zero_width(height: usize) -> Self {
2763 Self::ZeroWidth {
2764 remaining_height: height,
2765 chunk_size: polars_config::config().ideal_morsel_size() as usize,
2766 }
2767 }
2768}
2769
2770impl Iterator for RecordBatchIterWrap<'_> {
2771 type Item = RecordBatch;
2772
2773 fn next(&mut self) -> Option<Self::Item> {
2774 match self {
2775 Self::ZeroWidth {
2776 remaining_height,
2777 chunk_size,
2778 } => {
2779 let n = usize::min(*remaining_height, *chunk_size);
2780 *remaining_height -= n;
2781
2782 (n > 0).then(|| RecordBatch::new(n, ArrowSchemaRef::default(), vec![]))
2783 },
2784 Self::Batches(v) => v.next(),
2785 Self::PhysicalBatches(v) => v.next(),
2786 }
2787 }
2788
2789 fn size_hint(&self) -> (usize, Option<usize>) {
2790 match self {
2791 Self::ZeroWidth {
2792 remaining_height,
2793 chunk_size,
2794 } => {
2795 let n = remaining_height.div_ceil(*chunk_size);
2796 (n, Some(n))
2797 },
2798 Self::Batches(v) => v.size_hint(),
2799 Self::PhysicalBatches(v) => v.size_hint(),
2800 }
2801 }
2802}
2803
2804fn ensure_can_extend(left: &Column, right: &Column) -> PolarsResult<()> {
2806 polars_ensure!(
2807 left.name() == right.name(),
2808 ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}",
2809 left.name(), right.name(),
2810 );
2811 Ok(())
2812}
2813
2814#[cfg(test)]
2815mod test {
2816 use super::*;
2817
2818 fn create_frame() -> DataFrame {
2819 let s0 = Column::new("days".into(), [0, 1, 2].as_ref());
2820 let s1 = Column::new("temp".into(), [22.1, 19.9, 7.].as_ref());
2821 DataFrame::new_infer_height(vec![s0, s1]).unwrap()
2822 }
2823
2824 #[test]
2825 #[cfg_attr(miri, ignore)]
2826 fn test_recordbatch_iterator() {
2827 let df = df!(
2828 "foo" => [1, 2, 3, 4, 5]
2829 )
2830 .unwrap();
2831 let mut iter = df.iter_chunks(CompatLevel::newest(), false);
2832 assert_eq!(5, iter.next().unwrap().len());
2833 assert!(iter.next().is_none());
2834 }
2835
2836 #[test]
2837 #[cfg_attr(miri, ignore)]
2838 fn test_select() {
2839 let df = create_frame();
2840 assert_eq!(
2841 df.column("days")
2842 .unwrap()
2843 .as_series()
2844 .unwrap()
2845 .equal(1)
2846 .unwrap()
2847 .sum(),
2848 Some(1)
2849 );
2850 }
2851
2852 #[test]
2853 #[cfg_attr(miri, ignore)]
2854 fn test_filter_broadcast_on_string_col() {
2855 let col_name = "some_col";
2856 let v = vec!["test".to_string()];
2857 let s0 = Column::new(PlSmallStr::from_str(col_name), v);
2858 let mut df = DataFrame::new_infer_height(vec![s0]).unwrap();
2859
2860 df = df
2861 .filter(
2862 &df.column(col_name)
2863 .unwrap()
2864 .as_materialized_series()
2865 .equal("")
2866 .unwrap(),
2867 )
2868 .unwrap();
2869 assert_eq!(
2870 df.column(col_name)
2871 .unwrap()
2872 .as_materialized_series()
2873 .n_chunks(),
2874 1
2875 );
2876 }
2877
2878 #[test]
2879 #[cfg_attr(miri, ignore)]
2880 fn test_filter_broadcast_on_list_col() {
2881 let s1 = Series::new(PlSmallStr::EMPTY, [true, false, true]);
2882 let ll: ListChunked = [&s1].iter().copied().collect();
2883
2884 let mask = BooleanChunked::from_slice(PlSmallStr::EMPTY, &[false]);
2885 let new = ll.filter(&mask).unwrap();
2886
2887 assert_eq!(new.chunks.len(), 1);
2888 assert_eq!(new.len(), 0);
2889 }
2890
2891 #[test]
2892 fn slice() {
2893 let df = create_frame();
2894 let sliced_df = df.slice(0, 2);
2895 assert_eq!(sliced_df.shape(), (2, 2));
2896 }
2897
2898 #[test]
2899 fn rechunk_false() {
2900 let df = create_frame();
2901 assert!(!df.should_rechunk())
2902 }
2903
2904 #[test]
2905 fn rechunk_true() -> PolarsResult<()> {
2906 let mut base = df!(
2907 "a" => [1, 2, 3],
2908 "b" => [1, 2, 3]
2909 )?;
2910
2911 let mut s = Series::new("foo".into(), 0..2);
2913 let s2 = Series::new("bar".into(), 0..1);
2914 s.append(&s2)?;
2915
2916 let out = base.with_column(s.into_column())?;
2918
2919 assert!(out.should_rechunk());
2921 Ok(())
2922 }
2923
2924 #[test]
2925 fn test_duplicate_column() {
2926 let mut df = df! {
2927 "foo" => [1, 2, 3]
2928 }
2929 .unwrap();
2930 assert!(
2932 df.with_column(Column::new("foo".into(), &[1, 2, 3]))
2933 .is_ok()
2934 );
2935 assert!(
2936 df.with_column(Column::new("bar".into(), &[1, 2, 3]))
2937 .is_ok()
2938 );
2939 assert!(df.column("bar").is_ok())
2940 }
2941
2942 #[test]
2943 #[cfg_attr(miri, ignore)]
2944 fn distinct() {
2945 let df = df! {
2946 "flt" => [1., 1., 2., 2., 3., 3.],
2947 "int" => [1, 1, 2, 2, 3, 3, ],
2948 "str" => ["a", "a", "b", "b", "c", "c"]
2949 }
2950 .unwrap();
2951 let df = df
2952 .unique_stable(None, UniqueKeepStrategy::First, None)
2953 .unwrap()
2954 .sort(["flt"], SortMultipleOptions::default())
2955 .unwrap();
2956 let valid = df! {
2957 "flt" => [1., 2., 3.],
2958 "int" => [1, 2, 3],
2959 "str" => ["a", "b", "c"]
2960 }
2961 .unwrap();
2962 assert!(df.equals(&valid));
2963 }
2964
2965 #[test]
2966 fn test_vstack() {
2967 let mut df = df! {
2969 "flt" => [1., 1., 2., 2., 3., 3.],
2970 "int" => [1, 1, 2, 2, 3, 3, ],
2971 "str" => ["a", "a", "b", "b", "c", "c"]
2972 }
2973 .unwrap();
2974
2975 df.vstack_mut(&df.slice(0, 3)).unwrap();
2976 assert_eq!(df.first_col_n_chunks(), 2)
2977 }
2978
2979 #[test]
2980 fn test_vstack_on_empty_dataframe() {
2981 let mut df = DataFrame::empty();
2982
2983 let df_data = df! {
2984 "flt" => [1., 1., 2., 2., 3., 3.],
2985 "int" => [1, 1, 2, 2, 3, 3, ],
2986 "str" => ["a", "a", "b", "b", "c", "c"]
2987 }
2988 .unwrap();
2989
2990 df.vstack_mut(&df_data).unwrap();
2991 assert_eq!(df.height(), 6)
2992 }
2993
2994 #[test]
2995 fn test_unique_keep_none_with_slice() {
2996 let df = df! {
2997 "x" => [1, 2, 3, 2, 1]
2998 }
2999 .unwrap();
3000 let out = df
3001 .unique_stable(
3002 Some(&["x".to_string()][..]),
3003 UniqueKeepStrategy::None,
3004 Some((0, 2)),
3005 )
3006 .unwrap();
3007 let expected = df! {
3008 "x" => [3]
3009 }
3010 .unwrap();
3011 assert!(out.equals(&expected));
3012 }
3013
3014 #[test]
3015 #[cfg(feature = "dtype-i8")]
3016 fn test_apply_result_schema() {
3017 let mut df = df! {
3018 "x" => [1, 2, 3, 2, 1]
3019 }
3020 .unwrap();
3021
3022 let schema_before = df.schema().clone();
3023 df.apply("x", |f| f.cast(&DataType::Int8).unwrap()).unwrap();
3024 assert_ne!(&schema_before, df.schema());
3025 }
3026}