1#![allow(unsafe_op_in_unsafe_fn)]
2use arrow::datatypes::ArrowSchemaRef;
4use polars_row::ArrayRef;
5use polars_utils::UnitVec;
6use polars_utils::ideal_morsel_size::get_ideal_morsel_size;
7use polars_utils::itertools::Itertools;
8use rayon::prelude::*;
9
10use crate::chunked_array::flags::StatisticsFlags;
11#[cfg(feature = "algorithm_group_by")]
12use crate::chunked_array::ops::unique::is_unique_helper;
13use crate::prelude::gather::check_bounds_ca;
14use crate::prelude::*;
15#[cfg(feature = "row_hash")]
16use crate::utils::split_df;
17use crate::utils::{Container, NoNull, slice_offsets, try_get_supertype};
18use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
19
20#[cfg(feature = "dataframe_arithmetic")]
21mod arithmetic;
22pub mod builder;
23mod chunks;
24pub use chunks::chunk_df_for_writing;
25mod broadcast;
26pub mod column;
27mod dataframe;
28mod filter;
29mod projection;
30pub use dataframe::DataFrame;
31use filter::filter_zero_width;
32use projection::{AmortizedColumnSelector, LINEAR_SEARCH_LIMIT};
33
34pub mod explode;
35mod from;
36#[cfg(feature = "algorithm_group_by")]
37pub mod group_by;
38pub(crate) mod horizontal;
39#[cfg(feature = "proptest")]
40pub mod proptest;
41#[cfg(any(feature = "rows", feature = "object"))]
42pub mod row;
43mod top_k;
44mod upstream_traits;
45mod validation;
46
47use arrow::record_batch::{RecordBatch, RecordBatchT};
48use polars_utils::pl_str::PlSmallStr;
49#[cfg(feature = "serde")]
50use serde::{Deserialize, Serialize};
51use strum_macros::IntoStaticStr;
52
53use crate::POOL;
54#[cfg(feature = "row_hash")]
55use crate::hashing::_df_rows_to_hashes_threaded_vertical;
56use crate::prelude::sort::arg_sort;
57use crate::series::IsSorted;
58
59#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
60#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
61#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
62#[strum(serialize_all = "snake_case")]
63pub enum UniqueKeepStrategy {
64 First,
66 Last,
68 None,
70 #[default]
73 Any,
74}
75
76impl DataFrame {
77 pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
78 self.columns().iter().map(Column::as_materialized_series)
79 }
80
81 pub fn estimated_size(&self) -> usize {
94 self.columns().iter().map(Column::estimated_size).sum()
95 }
96
97 pub fn try_apply_columns(
98 &self,
99 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
100 ) -> PolarsResult<Vec<Column>> {
101 return inner(self, &func);
102
103 fn inner(
104 slf: &DataFrame,
105 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
106 ) -> PolarsResult<Vec<Column>> {
107 slf.columns().iter().map(func).collect()
108 }
109 }
110
111 pub fn apply_columns(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
112 return inner(self, &func);
113
114 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
115 slf.columns().iter().map(func).collect()
116 }
117 }
118
119 pub fn try_apply_columns_par(
120 &self,
121 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
122 ) -> PolarsResult<Vec<Column>> {
123 return inner(self, &func);
124
125 fn inner(
126 slf: &DataFrame,
127 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
128 ) -> PolarsResult<Vec<Column>> {
129 POOL.install(|| slf.columns().par_iter().map(func).collect())
130 }
131 }
132
133 pub fn apply_columns_par(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
134 return inner(self, &func);
135
136 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
137 POOL.install(|| slf.columns().par_iter().map(func).collect())
138 }
139 }
140
141 pub(crate) fn reserve_chunks(&mut self, additional: usize) {
143 for s in unsafe { self.columns_mut_retain_schema() } {
144 if let Column::Series(s) = s {
145 unsafe { s.chunks_mut().reserve(additional) }
148 }
149 }
150 }
151 pub fn new_from_index(&self, index: usize, height: usize) -> Self {
152 let new_cols = self.apply_columns(|c| c.new_from_index(index, height));
153
154 unsafe { Self::_new_unchecked_impl(height, new_cols).with_schema_from(self) }
155 }
156
157 pub fn full_null(schema: &Schema, height: usize) -> Self {
159 let columns = schema
160 .iter_fields()
161 .map(|f| Column::full_null(f.name().clone(), height, f.dtype()))
162 .collect();
163
164 unsafe { DataFrame::_new_unchecked_impl(height, columns) }
165 }
166
167 pub fn ensure_matches_schema(&mut self, schema: &Schema) -> PolarsResult<()> {
170 let mut did_cast = false;
171 let cached_schema = self.cached_schema().cloned();
172
173 for (col, (name, dt)) in unsafe { self.columns_mut() }.iter_mut().zip(schema.iter()) {
174 polars_ensure!(
175 col.name() == name,
176 SchemaMismatch: "column name mismatch: expected {:?}, found {:?}",
177 name,
178 col.name()
179 );
180
181 let needs_cast = col.dtype().matches_schema_type(dt)?;
182
183 if needs_cast {
184 *col = col.cast(dt)?;
185 did_cast = true;
186 }
187 }
188
189 if !did_cast {
190 unsafe { self.set_opt_schema(cached_schema) };
191 }
192
193 Ok(())
194 }
195
196 pub fn with_row_index(&self, name: PlSmallStr, offset: Option<IdxSize>) -> PolarsResult<Self> {
231 let mut new_columns = Vec::with_capacity(self.width() + 1);
232 let offset = offset.unwrap_or(0);
233
234 if self.get_column_index(&name).is_some() {
235 polars_bail!(duplicate = name)
236 }
237
238 let col = Column::new_row_index(name, offset, self.height())?;
239 new_columns.push(col);
240 new_columns.extend_from_slice(self.columns());
241
242 Ok(unsafe { DataFrame::new_unchecked(self.height(), new_columns) })
243 }
244
245 pub unsafe fn with_row_index_mut(
253 &mut self,
254 name: PlSmallStr,
255 offset: Option<IdxSize>,
256 ) -> &mut Self {
257 debug_assert!(
258 self.get_column_index(&name).is_none(),
259 "with_row_index_mut(): column with name {} already exists",
260 &name
261 );
262
263 let offset = offset.unwrap_or(0);
264 let col = Column::new_row_index(name, offset, self.height()).unwrap();
265
266 unsafe { self.columns_mut() }.insert(0, col);
267 self
268 }
269
270 pub fn shrink_to_fit(&mut self) {
272 for s in unsafe { self.columns_mut_retain_schema() } {
274 s.shrink_to_fit();
275 }
276 }
277
278 pub fn rechunk_mut_par(&mut self) -> &mut Self {
281 if self.columns().iter().any(|c| c.n_chunks() > 1) {
282 POOL.install(|| {
283 unsafe { self.columns_mut_retain_schema() }
284 .par_iter_mut()
285 .for_each(|c| *c = c.rechunk());
286 })
287 }
288
289 self
290 }
291
292 pub fn rechunk_mut(&mut self) -> &mut Self {
294 let columns = unsafe { self.columns_mut() };
296
297 for col in columns.iter_mut().filter(|c| c.n_chunks() > 1) {
298 *col = col.rechunk();
299 }
300
301 self
302 }
303
304 pub fn should_rechunk(&self) -> bool {
306 if !self
309 .columns()
310 .iter()
311 .filter_map(|c| c.as_series().map(|s| s.n_chunks()))
312 .all_equal()
313 {
314 return true;
315 }
316
317 let mut chunk_lengths = self.materialized_column_iter().map(|s| s.chunk_lengths());
319 match chunk_lengths.next() {
320 None => false,
321 Some(first_column_chunk_lengths) => {
322 if first_column_chunk_lengths.size_hint().0 == 1 {
324 return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
325 }
326 let height = self.height();
329 let n_chunks = first_column_chunk_lengths.size_hint().0;
330 if n_chunks > height && !(height == 0 && n_chunks == 1) {
331 return true;
332 }
333 let v: Vec<_> = first_column_chunk_lengths.collect();
335 for cl in chunk_lengths {
336 if cl.enumerate().any(|(idx, el)| Some(&el) != v.get(idx)) {
337 return true;
338 }
339 }
340 false
341 },
342 }
343 }
344
345 pub fn align_chunks_par(&mut self) -> &mut Self {
347 if self.should_rechunk() {
348 self.rechunk_mut_par()
349 } else {
350 self
351 }
352 }
353
354 pub fn align_chunks(&mut self) -> &mut Self {
356 if self.should_rechunk() {
357 self.rechunk_mut()
358 } else {
359 self
360 }
361 }
362
363 pub fn get_column_names(&self) -> Vec<&PlSmallStr> {
374 self.columns().iter().map(|s| s.name()).collect()
375 }
376
377 pub fn get_column_names_owned(&self) -> Vec<PlSmallStr> {
379 self.columns().iter().map(|s| s.name().clone()).collect()
380 }
381
382 pub fn set_column_names<T>(&mut self, new_names: &[T]) -> PolarsResult<()>
394 where
395 T: AsRef<str>,
396 {
397 polars_ensure!(
398 new_names.len() == self.width(),
399 ShapeMismatch: "{} column names provided for a DataFrame of width {}",
400 new_names.len(), self.width()
401 );
402
403 validation::ensure_names_unique(new_names)?;
404
405 *unsafe { self.columns_mut() } = std::mem::take(unsafe { self.columns_mut() })
406 .into_iter()
407 .zip(new_names)
408 .map(|(c, name)| c.with_name(PlSmallStr::from_str(name.as_ref())))
409 .collect();
410
411 Ok(())
412 }
413
414 pub fn dtypes(&self) -> Vec<DataType> {
427 self.columns().iter().map(|s| s.dtype().clone()).collect()
428 }
429
430 pub fn first_col_n_chunks(&self) -> usize {
432 match self.columns().iter().find_map(|col| col.as_series()) {
433 None if self.width() == 0 => 0,
434 None => 1,
435 Some(s) => s.n_chunks(),
436 }
437 }
438
439 pub fn max_n_chunks(&self) -> usize {
441 self.columns()
442 .iter()
443 .map(|s| s.as_series().map(|s| s.n_chunks()).unwrap_or(1))
444 .max()
445 .unwrap_or(0)
446 }
447
448 pub fn fields(&self) -> Vec<Field> {
464 self.columns()
465 .iter()
466 .map(|s| s.field().into_owned())
467 .collect()
468 }
469
470 pub fn hstack(&self, columns: &[Column]) -> PolarsResult<Self> {
504 let mut new_cols = Vec::with_capacity(self.width() + columns.len());
505
506 new_cols.extend(self.columns().iter().cloned());
507 new_cols.extend_from_slice(columns);
508
509 DataFrame::new(self.height(), new_cols)
510 }
511 pub fn vstack(&self, other: &DataFrame) -> PolarsResult<Self> {
552 let mut df = self.clone();
553 df.vstack_mut(other)?;
554 Ok(df)
555 }
556
557 pub fn vstack_mut(&mut self, other: &DataFrame) -> PolarsResult<&mut Self> {
598 if self.width() != other.width() {
599 polars_ensure!(
600 self.shape() == (0, 0),
601 ShapeMismatch:
602 "unable to append to a DataFrame of shape {:?} with a DataFrame of width {}",
603 self.shape(), other.width(),
604 );
605
606 self.clone_from(other);
607
608 return Ok(self);
609 }
610
611 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
612
613 unsafe { self.columns_mut_retain_schema() }
614 .iter_mut()
615 .zip(other.columns())
616 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
617 ensure_can_extend(&*left, right)?;
618 left.append(right).map_err(|e| {
619 e.context(format!("failed to vstack column '{}'", right.name()).into())
620 })?;
621 Ok(())
622 })?;
623
624 unsafe { self.set_height(new_height) };
625
626 Ok(self)
627 }
628
629 pub fn vstack_mut_owned(&mut self, other: DataFrame) -> PolarsResult<&mut Self> {
630 if self.width() != other.width() {
631 polars_ensure!(
632 self.shape() == (0, 0),
633 ShapeMismatch:
634 "unable to append to a DataFrame of width {} with a DataFrame of width {}",
635 self.width(), other.width(),
636 );
637
638 *self = other;
639
640 return Ok(self);
641 }
642
643 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
644
645 unsafe { self.columns_mut_retain_schema() }
646 .iter_mut()
647 .zip(other.into_columns())
648 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
649 ensure_can_extend(&*left, &right)?;
650 let right_name = right.name().clone();
651 left.append_owned(right).map_err(|e| {
652 e.context(format!("failed to vstack column '{right_name}'").into())
653 })?;
654 Ok(())
655 })?;
656
657 unsafe { self.set_height(new_height) };
658
659 Ok(self)
660 }
661
662 pub fn vstack_mut_unchecked(&mut self, other: &DataFrame) -> &mut Self {
669 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
670
671 unsafe { self.columns_mut_retain_schema() }
672 .iter_mut()
673 .zip(other.columns())
674 .for_each(|(left, right)| {
675 left.append(right)
676 .map_err(|e| {
677 e.context(format!("failed to vstack column '{}'", right.name()).into())
678 })
679 .expect("should not fail");
680 });
681
682 unsafe { self.set_height(new_height) };
683
684 self
685 }
686
687 pub fn vstack_mut_owned_unchecked(&mut self, other: DataFrame) -> &mut Self {
694 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
695
696 unsafe { self.columns_mut_retain_schema() }
697 .iter_mut()
698 .zip(other.into_columns())
699 .for_each(|(left, right)| {
700 left.append_owned(right).expect("should not fail");
701 });
702
703 unsafe { self.set_height(new_height) };
704
705 self
706 }
707
708 pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
723 polars_ensure!(
724 self.width() == other.width(),
725 ShapeMismatch:
726 "unable to extend a DataFrame of width {} with a DataFrame of width {}",
727 self.width(), other.width(),
728 );
729
730 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
731
732 unsafe { self.columns_mut_retain_schema() }
733 .iter_mut()
734 .zip(other.columns())
735 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
736 ensure_can_extend(&*left, right)?;
737 left.extend(right).map_err(|e| {
738 e.context(format!("failed to extend column '{}'", right.name()).into())
739 })?;
740 Ok(())
741 })?;
742
743 unsafe { self.set_height(new_height) };
744
745 Ok(())
746 }
747
748 pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
765 let idx = self.try_get_column_index(name)?;
766 Ok(unsafe { self.columns_mut() }.remove(idx))
767 }
768
769 pub fn drop_nulls<S>(&self, subset: Option<&[S]>) -> PolarsResult<Self>
798 where
799 for<'a> &'a S: AsRef<str>,
800 {
801 if let Some(v) = subset {
802 let v = self.select_to_vec(v)?;
803 self._drop_nulls_impl(v.as_slice())
804 } else {
805 self._drop_nulls_impl(self.columns())
806 }
807 }
808
809 fn _drop_nulls_impl(&self, subset: &[Column]) -> PolarsResult<Self> {
810 if subset.iter().all(|s| !s.has_nulls()) {
812 return Ok(self.clone());
813 }
814
815 let mut iter = subset.iter();
816
817 let mask = iter
818 .next()
819 .ok_or_else(|| polars_err!(NoData: "no data to drop nulls from"))?;
820 let mut mask = mask.is_not_null();
821
822 for c in iter {
823 mask = mask & c.is_not_null();
824 }
825 self.filter(&mask)
826 }
827
828 pub fn drop(&self, name: &str) -> PolarsResult<Self> {
843 let idx = self.try_get_column_index(name)?;
844 let mut new_cols = Vec::with_capacity(self.width() - 1);
845
846 self.columns().iter().enumerate().for_each(|(i, s)| {
847 if i != idx {
848 new_cols.push(s.clone())
849 }
850 });
851
852 Ok(unsafe { DataFrame::_new_unchecked_impl(self.height(), new_cols) })
853 }
854
855 pub fn drop_many<I, S>(&self, names: I) -> Self
857 where
858 I: IntoIterator<Item = S>,
859 S: Into<PlSmallStr>,
860 {
861 let names: PlHashSet<PlSmallStr> = names.into_iter().map(|s| s.into()).collect();
862 self.drop_many_amortized(&names)
863 }
864
865 pub fn drop_many_amortized(&self, names: &PlHashSet<PlSmallStr>) -> DataFrame {
867 if names.is_empty() {
868 return self.clone();
869 }
870 let mut new_cols = Vec::with_capacity(self.width().saturating_sub(names.len()));
871 self.columns().iter().for_each(|s| {
872 if !names.contains(s.name()) {
873 new_cols.push(s.clone())
874 }
875 });
876
877 unsafe { DataFrame::new_unchecked(self.height(), new_cols) }
878 }
879
880 fn insert_column_no_namecheck(
883 &mut self,
884 index: usize,
885 column: Column,
886 ) -> PolarsResult<&mut Self> {
887 if self.shape() == (0, 0) {
888 unsafe { self.set_height(column.len()) };
889 }
890
891 polars_ensure!(
892 column.len() == self.height(),
893 ShapeMismatch:
894 "unable to add a column of length {} to a DataFrame of height {}",
895 column.len(), self.height(),
896 );
897
898 unsafe { self.columns_mut() }.insert(index, column);
899 Ok(self)
900 }
901
902 pub fn insert_column(&mut self, index: usize, column: Column) -> PolarsResult<&mut Self> {
904 let name = column.name();
905
906 polars_ensure!(
907 self.get_column_index(name).is_none(),
908 Duplicate:
909 "column with name {:?} is already present in the DataFrame", name
910 );
911
912 self.insert_column_no_namecheck(index, column)
913 }
914
915 pub fn with_column(&mut self, mut column: Column) -> PolarsResult<&mut Self> {
918 if self.shape() == (0, 0) {
919 unsafe { self.set_height(column.len()) };
920 }
921
922 if column.len() != self.height() && column.len() == 1 {
923 column = column.new_from_index(0, self.height());
924 }
925
926 polars_ensure!(
927 column.len() == self.height(),
928 ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
929 column.len(), self.height(),
930 );
931
932 if let Some(i) = self.get_column_index(column.name()) {
933 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
934 } else {
935 unsafe { self.columns_mut() }.push(column)
936 };
937
938 Ok(self)
939 }
940
941 pub unsafe fn push_column_unchecked(&mut self, column: Column) -> &mut Self {
947 unsafe { self.columns_mut() }.push(column);
948 self
949 }
950
951 pub fn with_columns_mut(
954 &mut self,
955 columns: impl IntoIterator<Item = Column>,
956 output_schema: &Schema,
957 ) -> PolarsResult<()> {
958 let columns = columns.into_iter();
959
960 unsafe {
961 self.columns_mut_retain_schema()
962 .reserve(columns.size_hint().0)
963 }
964
965 for c in columns {
966 self.with_column_and_schema_mut(c, output_schema)?;
967 }
968
969 Ok(())
970 }
971
972 fn with_column_and_schema_mut(
973 &mut self,
974 mut column: Column,
975 output_schema: &Schema,
976 ) -> PolarsResult<&mut Self> {
977 if self.shape() == (0, 0) {
978 unsafe { self.set_height(column.len()) };
979 }
980
981 if column.len() != self.height() && column.len() == 1 {
982 column = column.new_from_index(0, self.height());
983 }
984
985 polars_ensure!(
986 column.len() == self.height(),
987 ShapeMismatch:
988 "unable to add a column of length {} to a DataFrame of height {}",
989 column.len(), self.height(),
990 );
991
992 let i = output_schema
993 .index_of(column.name())
994 .or_else(|| self.get_column_index(column.name()))
995 .unwrap_or(self.width());
996
997 if i < self.width() {
998 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
999 } else if i == self.width() {
1000 unsafe { self.columns_mut() }.push(column)
1001 } else {
1002 panic!()
1004 }
1005
1006 Ok(self)
1007 }
1008
1009 pub fn get(&self, idx: usize) -> Option<Vec<AnyValue<'_>>> {
1020 (idx < self.height()).then(|| self.columns().iter().map(|c| c.get(idx).unwrap()).collect())
1021 }
1022
1023 pub fn select_at_idx(&self, idx: usize) -> Option<&Column> {
1039 self.columns().get(idx)
1040 }
1041
1042 pub fn get_column_index(&self, name: &str) -> Option<usize> {
1060 if let Some(schema) = self.cached_schema() {
1061 schema.index_of(name)
1062 } else if self.width() <= LINEAR_SEARCH_LIMIT {
1063 self.columns().iter().position(|s| s.name() == name)
1064 } else {
1065 self.schema().index_of(name)
1066 }
1067 }
1068
1069 pub fn try_get_column_index(&self, name: &str) -> PolarsResult<usize> {
1071 self.get_column_index(name)
1072 .ok_or_else(|| polars_err!(col_not_found = name))
1073 }
1074
1075 pub fn column(&self, name: &str) -> PolarsResult<&Column> {
1089 let idx = self.try_get_column_index(name)?;
1090 Ok(self.select_at_idx(idx).unwrap())
1091 }
1092
1093 pub fn select<I, S>(&self, names: I) -> PolarsResult<Self>
1104 where
1105 I: IntoIterator<Item = S>,
1106 S: AsRef<str>,
1107 {
1108 DataFrame::new(self.height(), self.select_to_vec(names)?)
1109 }
1110
1111 pub unsafe fn select_unchecked<I, S>(&self, names: I) -> PolarsResult<Self>
1116 where
1117 I: IntoIterator<Item = S>,
1118 S: AsRef<str>,
1119 {
1120 Ok(unsafe { DataFrame::new_unchecked(self.height(), self.select_to_vec(names)?) })
1121 }
1122
1123 pub fn select_to_vec(
1141 &self,
1142 selection: impl IntoIterator<Item = impl AsRef<str>>,
1143 ) -> PolarsResult<Vec<Column>> {
1144 AmortizedColumnSelector::new(self).select_multiple(selection)
1145 }
1146
1147 pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1159 if self.width() == 0 {
1160 filter_zero_width(self.height(), mask)
1161 } else {
1162 let new_columns: Vec<Column> = self.try_apply_columns_par(|s| s.filter(mask))?;
1163 let out = unsafe {
1164 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1165 };
1166
1167 Ok(out)
1168 }
1169 }
1170
1171 pub fn filter_seq(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1173 if self.width() == 0 {
1174 filter_zero_width(self.height(), mask)
1175 } else {
1176 let new_columns: Vec<Column> = self.try_apply_columns(|s| s.filter(mask))?;
1177 let out = unsafe {
1178 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1179 };
1180
1181 Ok(out)
1182 }
1183 }
1184
1185 pub fn take(&self, indices: &IdxCa) -> PolarsResult<Self> {
1197 check_bounds_ca(indices, self.height().try_into().unwrap_or(IdxSize::MAX))?;
1198
1199 let new_cols = self.apply_columns_par(|c| {
1200 assert_eq!(c.len(), self.height());
1201 unsafe { c.take_unchecked(indices) }
1202 });
1203
1204 Ok(unsafe { DataFrame::new_unchecked(indices.len(), new_cols).with_schema_from(self) })
1205 }
1206
1207 pub unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
1210 self.take_unchecked_impl(idx, true)
1211 }
1212
1213 #[cfg(feature = "algorithm_group_by")]
1216 pub unsafe fn gather_group_unchecked(&self, group: &GroupsIndicator) -> Self {
1217 match group {
1218 GroupsIndicator::Idx((_, indices)) => unsafe {
1219 self.take_slice_unchecked_impl(indices.as_slice(), false)
1220 },
1221 GroupsIndicator::Slice([offset, len]) => self.slice(*offset as i64, *len as usize),
1222 }
1223 }
1224
1225 pub unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
1228 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1229 POOL.install(|| {
1230 if POOL.current_num_threads() > self.width() {
1231 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1232 if self.height() / stride >= 2 {
1233 self.apply_columns_par(|c| {
1234 let c = if c.dtype().is_nested() {
1237 &c.rechunk()
1238 } else {
1239 c
1240 };
1241
1242 (0..idx.len().div_ceil(stride))
1243 .into_par_iter()
1244 .map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
1245 .reduce(
1246 || Column::new_empty(c.name().clone(), c.dtype()),
1247 |mut a, b| {
1248 a.append_owned(b).unwrap();
1249 a
1250 },
1251 )
1252 })
1253 } else {
1254 self.apply_columns_par(|c| c.take_unchecked(idx))
1255 }
1256 } else {
1257 self.apply_columns_par(|c| c.take_unchecked(idx))
1258 }
1259 })
1260 } else {
1261 self.apply_columns(|s| s.take_unchecked(idx))
1262 };
1263
1264 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1265 }
1266
1267 pub unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self {
1270 self.take_slice_unchecked_impl(idx, true)
1271 }
1272
1273 pub unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
1276 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1277 POOL.install(|| {
1278 if POOL.current_num_threads() > self.width() {
1279 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1280 if self.height() / stride >= 2 {
1281 self.apply_columns_par(|c| {
1282 let c = if c.dtype().is_nested() {
1285 &c.rechunk()
1286 } else {
1287 c
1288 };
1289
1290 (0..idx.len().div_ceil(stride))
1291 .into_par_iter()
1292 .map(|i| {
1293 let idx = &idx[i * stride..];
1294 let idx = &idx[..idx.len().min(stride)];
1295 c.take_slice_unchecked(idx)
1296 })
1297 .reduce(
1298 || Column::new_empty(c.name().clone(), c.dtype()),
1299 |mut a, b| {
1300 a.append_owned(b).unwrap();
1301 a
1302 },
1303 )
1304 })
1305 } else {
1306 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1307 }
1308 } else {
1309 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1310 }
1311 })
1312 } else {
1313 self.apply_columns(|s| s.take_slice_unchecked(idx))
1314 };
1315 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1316 }
1317
1318 pub fn rename(&mut self, column: &str, name: PlSmallStr) -> PolarsResult<&mut Self> {
1333 if column == name.as_str() {
1334 return Ok(self);
1335 }
1336 polars_ensure!(
1337 !self.schema().contains(&name),
1338 Duplicate: "column rename attempted with already existing name \"{name}\""
1339 );
1340
1341 self.get_column_index(column)
1342 .and_then(|idx| unsafe { self.columns_mut() }.get_mut(idx))
1343 .ok_or_else(|| polars_err!(col_not_found = column))
1344 .map(|c| c.rename(name))?;
1345
1346 Ok(self)
1347 }
1348
1349 pub fn rename_many<'a>(
1350 &mut self,
1351 renames: impl Iterator<Item = (&'a str, PlSmallStr)>,
1352 ) -> PolarsResult<&mut Self> {
1353 let mut schema_arc = self.schema().clone();
1354 let schema = Arc::make_mut(&mut schema_arc);
1355
1356 for (from, to) in renames {
1357 if from == to.as_str() {
1358 continue;
1359 }
1360
1361 polars_ensure!(
1362 !schema.contains(&to),
1363 Duplicate: "column rename attempted with already existing name \"{to}\""
1364 );
1365
1366 match schema.get_full(from) {
1367 None => polars_bail!(col_not_found = from),
1368 Some((idx, _, _)) => {
1369 let (n, _) = schema.get_at_index_mut(idx).unwrap();
1370 *n = to.clone();
1371 unsafe { self.columns_mut() }
1372 .get_mut(idx)
1373 .unwrap()
1374 .rename(to);
1375 },
1376 }
1377 }
1378
1379 unsafe { self.set_schema(schema_arc) };
1380
1381 Ok(self)
1382 }
1383
1384 pub fn sort_in_place(
1388 &mut self,
1389 by: impl IntoIterator<Item = impl AsRef<str>>,
1390 sort_options: SortMultipleOptions,
1391 ) -> PolarsResult<&mut Self> {
1392 let by_column = self.select_to_vec(by)?;
1393
1394 let mut out = self.sort_impl(by_column, sort_options, None)?;
1395 unsafe { out.set_schema_from(self) };
1396
1397 *self = out;
1398
1399 Ok(self)
1400 }
1401
1402 #[doc(hidden)]
1403 pub fn sort_impl(
1405 &self,
1406 by_column: Vec<Column>,
1407 sort_options: SortMultipleOptions,
1408 slice: Option<(i64, usize)>,
1409 ) -> PolarsResult<Self> {
1410 if by_column.is_empty() {
1411 return if let Some((offset, len)) = slice {
1413 Ok(self.slice(offset, len))
1414 } else {
1415 Ok(self.clone())
1416 };
1417 }
1418
1419 let first_descending = sort_options.descending[0];
1424 let first_by_column = by_column[0].name().to_string();
1425
1426 let set_sorted = |df: &mut DataFrame| {
1427 let _ = df.apply(&first_by_column, |s| {
1430 let mut s = s.clone();
1431 if first_descending {
1432 s.set_sorted_flag(IsSorted::Descending)
1433 } else {
1434 s.set_sorted_flag(IsSorted::Ascending)
1435 }
1436 s
1437 });
1438 };
1439
1440 if self.shape_has_zero() {
1441 let mut out = self.clone();
1442 set_sorted(&mut out);
1443 return Ok(out);
1444 }
1445
1446 if let Some((0, k)) = slice {
1447 if k < self.height() {
1448 return self.bottom_k_impl(k, by_column, sort_options);
1449 }
1450 }
1451 #[cfg(feature = "dtype-categorical")]
1455 let is_not_categorical_enum =
1456 !(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
1457 || matches!(by_column[0].dtype(), DataType::Enum(_, _)));
1458
1459 #[cfg(not(feature = "dtype-categorical"))]
1460 #[allow(non_upper_case_globals)]
1461 const is_not_categorical_enum: bool = true;
1462
1463 if by_column.len() == 1 && is_not_categorical_enum {
1464 let required_sorting = if sort_options.descending[0] {
1465 IsSorted::Descending
1466 } else {
1467 IsSorted::Ascending
1468 };
1469 let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
1472 && ((by_column[0].null_count() == 0)
1473 || by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
1474 == sort_options.nulls_last[0]);
1475
1476 if no_sorting_required {
1477 return if let Some((offset, len)) = slice {
1478 Ok(self.slice(offset, len))
1479 } else {
1480 Ok(self.clone())
1481 };
1482 }
1483 }
1484
1485 let has_nested = by_column.iter().any(|s| s.dtype().is_nested());
1486 let allow_threads = sort_options.multithreaded;
1487
1488 let mut df = self.clone();
1490 let df = df.rechunk_mut_par();
1491 let mut take = match (by_column.len(), has_nested) {
1492 (1, false) => {
1493 let s = &by_column[0];
1494 let options = SortOptions {
1495 descending: sort_options.descending[0],
1496 nulls_last: sort_options.nulls_last[0],
1497 multithreaded: sort_options.multithreaded,
1498 maintain_order: sort_options.maintain_order,
1499 limit: sort_options.limit,
1500 };
1501 if df.width() == 1 && df.try_get_column_index(s.name().as_str()).is_ok() {
1505 let mut out = s.sort_with(options)?;
1506 if let Some((offset, len)) = slice {
1507 out = out.slice(offset, len);
1508 }
1509 return Ok(out.into_frame());
1510 }
1511 s.arg_sort(options)
1512 },
1513 _ => arg_sort(&by_column, sort_options)?,
1514 };
1515
1516 if let Some((offset, len)) = slice {
1517 take = take.slice(offset, len);
1518 }
1519
1520 let mut df = unsafe { df.take_unchecked_impl(&take, allow_threads) };
1523 set_sorted(&mut df);
1524 Ok(df)
1525 }
1526
1527 pub fn _to_metadata(&self) -> DataFrame {
1532 let num_columns = self.width();
1533
1534 let mut column_names =
1535 StringChunkedBuilder::new(PlSmallStr::from_static("column_name"), num_columns);
1536 let mut repr_ca = StringChunkedBuilder::new(PlSmallStr::from_static("repr"), num_columns);
1537 let mut sorted_asc_ca =
1538 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_asc"), num_columns);
1539 let mut sorted_dsc_ca =
1540 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_dsc"), num_columns);
1541 let mut fast_explode_list_ca =
1542 BooleanChunkedBuilder::new(PlSmallStr::from_static("fast_explode_list"), num_columns);
1543 let mut materialized_at_ca =
1544 StringChunkedBuilder::new(PlSmallStr::from_static("materialized_at"), num_columns);
1545
1546 for col in self.columns() {
1547 let flags = col.get_flags();
1548
1549 let (repr, materialized_at) = match col {
1550 Column::Series(s) => ("series", s.materialized_at()),
1551 Column::Scalar(_) => ("scalar", None),
1552 };
1553 let sorted_asc = flags.contains(StatisticsFlags::IS_SORTED_ASC);
1554 let sorted_dsc = flags.contains(StatisticsFlags::IS_SORTED_DSC);
1555 let fast_explode_list = flags.contains(StatisticsFlags::CAN_FAST_EXPLODE_LIST);
1556
1557 column_names.append_value(col.name().clone());
1558 repr_ca.append_value(repr);
1559 sorted_asc_ca.append_value(sorted_asc);
1560 sorted_dsc_ca.append_value(sorted_dsc);
1561 fast_explode_list_ca.append_value(fast_explode_list);
1562 materialized_at_ca.append_option(materialized_at.map(|v| format!("{v:#?}")));
1563 }
1564
1565 unsafe {
1566 DataFrame::new_unchecked(
1567 self.width(),
1568 vec![
1569 column_names.finish().into_column(),
1570 repr_ca.finish().into_column(),
1571 sorted_asc_ca.finish().into_column(),
1572 sorted_dsc_ca.finish().into_column(),
1573 fast_explode_list_ca.finish().into_column(),
1574 materialized_at_ca.finish().into_column(),
1575 ],
1576 )
1577 }
1578 }
1579 pub fn sort(
1617 &self,
1618 by: impl IntoIterator<Item = impl AsRef<str>>,
1619 sort_options: SortMultipleOptions,
1620 ) -> PolarsResult<Self> {
1621 let mut df = self.clone();
1622 df.sort_in_place(by, sort_options)?;
1623 Ok(df)
1624 }
1625
1626 pub fn replace(&mut self, column: &str, new_col: Column) -> PolarsResult<&mut Self> {
1641 self.apply(column, |_| new_col)
1642 }
1643
1644 pub fn replace_column(&mut self, index: usize, new_column: Column) -> PolarsResult<&mut Self> {
1659 polars_ensure!(
1660 index < self.width(),
1661 ShapeMismatch:
1662 "unable to replace at index {}, the DataFrame has only {} columns",
1663 index, self.width(),
1664 );
1665
1666 polars_ensure!(
1667 new_column.len() == self.height(),
1668 ShapeMismatch:
1669 "unable to replace a column, series length {} doesn't match the DataFrame height {}",
1670 new_column.len(), self.height(),
1671 );
1672
1673 unsafe { *self.columns_mut().get_mut(index).unwrap() = new_column };
1674
1675 Ok(self)
1676 }
1677
1678 pub fn apply<F, C>(&mut self, name: &str, f: F) -> PolarsResult<&mut Self>
1719 where
1720 F: FnOnce(&Column) -> C,
1721 C: IntoColumn,
1722 {
1723 let idx = self.try_get_column_index(name)?;
1724 self.apply_at_idx(idx, f)?;
1725 Ok(self)
1726 }
1727
1728 pub fn apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1759 where
1760 F: FnOnce(&Column) -> C,
1761 C: IntoColumn,
1762 {
1763 let df_height = self.height();
1764 let width = self.width();
1765
1766 let cached_schema = self.cached_schema().cloned();
1767
1768 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1769 polars_err!(
1770 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1771 idx, width
1772 )
1773 })?;
1774
1775 let mut new_col = f(col).into_column();
1776
1777 if new_col.len() != df_height && new_col.len() == 1 {
1778 new_col = new_col.new_from_index(0, df_height);
1779 }
1780
1781 polars_ensure!(
1782 new_col.len() == df_height,
1783 ShapeMismatch:
1784 "apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1785 new_col.len(), df_height
1786 );
1787
1788 new_col = new_col.with_name(col.name().clone());
1789 let col_before = std::mem::replace(col, new_col);
1790
1791 if col.dtype() == col_before.dtype() {
1792 unsafe { self.set_opt_schema(cached_schema) };
1793 }
1794
1795 Ok(self)
1796 }
1797
1798 pub fn try_apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1839 where
1840 F: FnOnce(&Column) -> PolarsResult<C>,
1841 C: IntoColumn,
1842 {
1843 let df_height = self.height();
1844 let width = self.width();
1845
1846 let cached_schema = self.cached_schema().cloned();
1847
1848 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1849 polars_err!(
1850 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1851 idx, width
1852 )
1853 })?;
1854
1855 let mut new_col = f(col).map(|c| c.into_column())?;
1856
1857 polars_ensure!(
1858 new_col.len() == df_height,
1859 ShapeMismatch:
1860 "try_apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1861 new_col.len(), df_height
1862 );
1863
1864 new_col = new_col.with_name(col.name().clone());
1866 let col_before = std::mem::replace(col, new_col);
1867
1868 if col.dtype() == col_before.dtype() {
1869 unsafe { self.set_opt_schema(cached_schema) };
1870 }
1871
1872 Ok(self)
1873 }
1874
1875 pub fn try_apply<F, C>(&mut self, column: &str, f: F) -> PolarsResult<&mut Self>
1918 where
1919 F: FnOnce(&Series) -> PolarsResult<C>,
1920 C: IntoColumn,
1921 {
1922 let idx = self.try_get_column_index(column)?;
1923 self.try_apply_at_idx(idx, |c| f(c.as_materialized_series()))
1924 }
1925
1926 #[must_use]
1956 pub fn slice(&self, offset: i64, length: usize) -> Self {
1957 if offset == 0 && length == self.height() {
1958 return self.clone();
1959 }
1960
1961 if length == 0 {
1962 return self.clear();
1963 }
1964
1965 let cols = self.apply_columns(|s| s.slice(offset, length));
1966
1967 let height = if let Some(fst) = cols.first() {
1968 fst.len()
1969 } else {
1970 let (_, length) = slice_offsets(offset, length, self.height());
1971 length
1972 };
1973
1974 unsafe { DataFrame::_new_unchecked_impl(height, cols).with_schema_from(self) }
1975 }
1976
1977 pub fn split_at(&self, offset: i64) -> (Self, Self) {
1979 let (a, b) = self.columns().iter().map(|s| s.split_at(offset)).unzip();
1980
1981 let (idx, _) = slice_offsets(offset, 0, self.height());
1982
1983 let a = unsafe { DataFrame::new_unchecked(idx, a).with_schema_from(self) };
1984 let b = unsafe { DataFrame::new_unchecked(self.height() - idx, b).with_schema_from(self) };
1985 (a, b)
1986 }
1987
1988 #[must_use]
1989 pub fn clear(&self) -> Self {
1990 let cols = self.columns().iter().map(|s| s.clear()).collect::<Vec<_>>();
1991 unsafe { DataFrame::_new_unchecked_impl(0, cols).with_schema_from(self) }
1992 }
1993
1994 #[must_use]
1995 pub fn slice_par(&self, offset: i64, length: usize) -> Self {
1996 if offset == 0 && length == self.height() {
1997 return self.clone();
1998 }
1999 let columns = self.apply_columns_par(|s| s.slice(offset, length));
2000 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2001 }
2002
2003 #[must_use]
2004 pub fn _slice_and_realloc(&self, offset: i64, length: usize) -> Self {
2005 if offset == 0 && length == self.height() {
2006 return self.clone();
2007 }
2008 let columns = self.apply_columns(|s| {
2010 let mut out = s.slice(offset, length);
2011 out.shrink_to_fit();
2012 out
2013 });
2014 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2015 }
2016
2017 #[must_use]
2051 pub fn head(&self, length: Option<usize>) -> Self {
2052 let new_height = usize::min(self.height(), length.unwrap_or(HEAD_DEFAULT_LENGTH));
2053 let new_cols = self.apply_columns(|c| c.head(Some(new_height)));
2054
2055 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2056 }
2057
2058 #[must_use]
2089 pub fn tail(&self, length: Option<usize>) -> Self {
2090 let new_height = usize::min(self.height(), length.unwrap_or(TAIL_DEFAULT_LENGTH));
2091 let new_cols = self.apply_columns(|c| c.tail(Some(new_height)));
2092
2093 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2094 }
2095
2096 pub fn iter_chunks(
2106 &self,
2107 compat_level: CompatLevel,
2108 parallel: bool,
2109 ) -> impl Iterator<Item = RecordBatch> + '_ {
2110 debug_assert!(!self.should_rechunk(), "expected equal chunks");
2111
2112 if self.width() == 0 {
2113 return RecordBatchIterWrap::new_zero_width(self.height());
2114 }
2115
2116 let must_convert = compat_level.0 == 0;
2119 let parallel = parallel
2120 && must_convert
2121 && self.width() > 1
2122 && self
2123 .columns()
2124 .iter()
2125 .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));
2126
2127 RecordBatchIterWrap::Batches(RecordBatchIter {
2128 df: self,
2129 schema: Arc::new(
2130 self.columns()
2131 .iter()
2132 .map(|c| c.field().to_arrow(compat_level))
2133 .collect(),
2134 ),
2135 idx: 0,
2136 n_chunks: usize::max(1, self.first_col_n_chunks()),
2137 compat_level,
2138 parallel,
2139 })
2140 }
2141
2142 pub fn iter_chunks_physical(&self) -> impl Iterator<Item = RecordBatch> + '_ {
2152 debug_assert!(!self.should_rechunk());
2153
2154 if self.width() == 0 {
2155 return RecordBatchIterWrap::new_zero_width(self.height());
2156 }
2157
2158 RecordBatchIterWrap::PhysicalBatches(PhysRecordBatchIter {
2159 schema: Arc::new(
2160 self.columns()
2161 .iter()
2162 .map(|c| c.field().to_arrow(CompatLevel::newest()))
2163 .collect(),
2164 ),
2165 arr_iters: self
2166 .materialized_column_iter()
2167 .map(|s| s.chunks().iter())
2168 .collect(),
2169 })
2170 }
2171
2172 #[must_use]
2174 pub fn reverse(&self) -> Self {
2175 let new_cols = self.apply_columns(Column::reverse);
2176 unsafe { DataFrame::new_unchecked(self.height(), new_cols).with_schema_from(self) }
2177 }
2178
2179 #[must_use]
2184 pub fn shift(&self, periods: i64) -> Self {
2185 let col = self.apply_columns_par(|s| s.shift(periods));
2186 unsafe { DataFrame::new_unchecked(self.height(), col).with_schema_from(self) }
2187 }
2188
2189 pub fn fill_null(&self, strategy: FillNullStrategy) -> PolarsResult<Self> {
2198 let col = self.try_apply_columns_par(|s| s.fill_null(strategy))?;
2199
2200 Ok(unsafe { DataFrame::new_unchecked(self.height(), col) })
2201 }
2202
2203 pub fn pipe<F, B>(self, f: F) -> PolarsResult<B>
2205 where
2206 F: Fn(DataFrame) -> PolarsResult<B>,
2207 {
2208 f(self)
2209 }
2210
2211 pub fn pipe_mut<F, B>(&mut self, f: F) -> PolarsResult<B>
2213 where
2214 F: Fn(&mut DataFrame) -> PolarsResult<B>,
2215 {
2216 f(self)
2217 }
2218
2219 pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> PolarsResult<B>
2221 where
2222 F: Fn(DataFrame, Args) -> PolarsResult<B>,
2223 {
2224 f(self, args)
2225 }
2226 #[cfg(feature = "algorithm_group_by")]
2260 pub fn unique_stable(
2261 &self,
2262 subset: Option<&[String]>,
2263 keep: UniqueKeepStrategy,
2264 slice: Option<(i64, usize)>,
2265 ) -> PolarsResult<DataFrame> {
2266 self.unique_impl(
2267 true,
2268 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2269 keep,
2270 slice,
2271 )
2272 }
2273
2274 #[cfg(feature = "algorithm_group_by")]
2276 pub fn unique<I, S>(
2277 &self,
2278 subset: Option<&[String]>,
2279 keep: UniqueKeepStrategy,
2280 slice: Option<(i64, usize)>,
2281 ) -> PolarsResult<DataFrame> {
2282 self.unique_impl(
2283 false,
2284 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2285 keep,
2286 slice,
2287 )
2288 }
2289
2290 #[cfg(feature = "algorithm_group_by")]
2291 pub fn unique_impl(
2292 &self,
2293 maintain_order: bool,
2294 subset: Option<Vec<PlSmallStr>>,
2295 keep: UniqueKeepStrategy,
2296 slice: Option<(i64, usize)>,
2297 ) -> PolarsResult<Self> {
2298 if self.width() == 0 {
2299 let height = usize::min(self.height(), 1);
2300 return Ok(DataFrame::empty_with_height(height));
2301 }
2302
2303 let names = subset.unwrap_or_else(|| self.get_column_names_owned());
2304 let mut df = self.clone();
2305 df.rechunk_mut_par();
2307
2308 let columns = match (keep, maintain_order) {
2309 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, true) => {
2310 let gb = df.group_by_stable(names)?;
2311 let groups = gb.get_groups();
2312 let (offset, len) = slice.unwrap_or((0, groups.len()));
2313 let groups = groups.slice(offset, len);
2314 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2315 },
2316 (UniqueKeepStrategy::Last, true) => {
2317 let gb = df.group_by_stable(names)?;
2320 let groups = gb.get_groups();
2321
2322 let last_idx: NoNull<IdxCa> = groups
2323 .iter()
2324 .map(|g| match g {
2325 GroupsIndicator::Idx((_first, idx)) => idx[idx.len() - 1],
2326 GroupsIndicator::Slice([first, len]) => first + len - 1,
2327 })
2328 .collect();
2329
2330 let mut last_idx = last_idx.into_inner().sort(false);
2331
2332 if let Some((offset, len)) = slice {
2333 last_idx = last_idx.slice(offset, len);
2334 }
2335
2336 let last_idx = NoNull::new(last_idx);
2337 let out = unsafe { df.take_unchecked(&last_idx) };
2338 return Ok(out);
2339 },
2340 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, false) => {
2341 let gb = df.group_by(names)?;
2342 let groups = gb.get_groups();
2343 let (offset, len) = slice.unwrap_or((0, groups.len()));
2344 let groups = groups.slice(offset, len);
2345 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2346 },
2347 (UniqueKeepStrategy::Last, false) => {
2348 let gb = df.group_by(names)?;
2349 let groups = gb.get_groups();
2350 let (offset, len) = slice.unwrap_or((0, groups.len()));
2351 let groups = groups.slice(offset, len);
2352 df.apply_columns_par(|s| unsafe { s.agg_last(&groups) })
2353 },
2354 (UniqueKeepStrategy::None, _) => {
2355 let df_part = df.select(names)?;
2356 let mask = df_part.is_unique()?;
2357 let mut filtered = df.filter(&mask)?;
2358
2359 if let Some((offset, len)) = slice {
2360 filtered = filtered.slice(offset, len);
2361 }
2362 return Ok(filtered);
2363 },
2364 };
2365 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns).with_schema_from(self) })
2366 }
2367
2368 #[cfg(feature = "algorithm_group_by")]
2382 pub fn is_unique(&self) -> PolarsResult<BooleanChunked> {
2383 let gb = self.group_by(self.get_column_names_owned())?;
2384 let groups = gb.get_groups();
2385 Ok(is_unique_helper(
2386 groups,
2387 self.height() as IdxSize,
2388 true,
2389 false,
2390 ))
2391 }
2392
2393 #[cfg(feature = "algorithm_group_by")]
2407 pub fn is_duplicated(&self) -> PolarsResult<BooleanChunked> {
2408 let gb = self.group_by(self.get_column_names_owned())?;
2409 let groups = gb.get_groups();
2410 Ok(is_unique_helper(
2411 groups,
2412 self.height() as IdxSize,
2413 false,
2414 true,
2415 ))
2416 }
2417
2418 #[must_use]
2420 pub fn null_count(&self) -> Self {
2421 let cols =
2422 self.apply_columns(|c| Column::new(c.name().clone(), [c.null_count() as IdxSize]));
2423 unsafe { Self::new_unchecked(1, cols) }
2424 }
2425
2426 #[cfg(feature = "row_hash")]
2428 pub fn hash_rows(
2429 &mut self,
2430 hasher_builder: Option<PlSeedableRandomStateQuality>,
2431 ) -> PolarsResult<UInt64Chunked> {
2432 let dfs = split_df(self, POOL.current_num_threads(), false);
2433 let (cas, _) = _df_rows_to_hashes_threaded_vertical(&dfs, hasher_builder)?;
2434
2435 let mut iter = cas.into_iter();
2436 let mut acc_ca = iter.next().unwrap();
2437 for ca in iter {
2438 acc_ca.append(&ca)?;
2439 }
2440 Ok(acc_ca.rechunk().into_owned())
2441 }
2442
2443 pub fn get_supertype(&self) -> Option<PolarsResult<DataType>> {
2445 self.columns()
2446 .iter()
2447 .map(|s| Ok(s.dtype().clone()))
2448 .reduce(|acc, b| try_get_supertype(&acc?, &b.unwrap()))
2449 }
2450
2451 #[doc(hidden)]
2456 pub unsafe fn _take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
2457 self._take_unchecked_slice_sorted(idx, allow_threads, IsSorted::Not)
2458 }
2459
2460 #[doc(hidden)]
2467 pub unsafe fn _take_unchecked_slice_sorted(
2468 &self,
2469 idx: &[IdxSize],
2470 allow_threads: bool,
2471 sorted: IsSorted,
2472 ) -> Self {
2473 #[cfg(debug_assertions)]
2474 {
2475 if idx.len() > 2 {
2476 use crate::series::IsSorted;
2477
2478 match sorted {
2479 IsSorted::Ascending => {
2480 assert!(idx[0] <= idx[idx.len() - 1]);
2481 },
2482 IsSorted::Descending => {
2483 assert!(idx[0] >= idx[idx.len() - 1]);
2484 },
2485 _ => {},
2486 }
2487 }
2488 }
2489 let mut ca = IdxCa::mmap_slice(PlSmallStr::EMPTY, idx);
2490 ca.set_sorted_flag(sorted);
2491 self.take_unchecked_impl(&ca, allow_threads)
2492 }
2493 #[cfg(all(feature = "partition_by", feature = "algorithm_group_by"))]
2494 #[doc(hidden)]
2495 pub fn _partition_by_impl(
2496 &self,
2497 cols: &[PlSmallStr],
2498 stable: bool,
2499 include_key: bool,
2500 parallel: bool,
2501 ) -> PolarsResult<Vec<DataFrame>> {
2502 let selected_keys = self.select_to_vec(cols.iter().cloned())?;
2503 let groups = self.group_by_with_series(selected_keys, parallel, stable)?;
2504 let groups = groups.into_groups();
2505
2506 let df = if include_key {
2508 self.clone()
2509 } else {
2510 self.drop_many(cols.iter().cloned())
2511 };
2512
2513 if parallel {
2514 POOL.install(|| {
2517 match groups.as_ref() {
2518 GroupsType::Idx(idx) => {
2519 let mut df = df.clone();
2521 df.rechunk_mut_par();
2522 Ok(idx
2523 .into_par_iter()
2524 .map(|(_, group)| {
2525 unsafe {
2527 df._take_unchecked_slice_sorted(
2528 group,
2529 false,
2530 IsSorted::Ascending,
2531 )
2532 }
2533 })
2534 .collect())
2535 },
2536 GroupsType::Slice { groups, .. } => Ok(groups
2537 .into_par_iter()
2538 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2539 .collect()),
2540 }
2541 })
2542 } else {
2543 match groups.as_ref() {
2544 GroupsType::Idx(idx) => {
2545 let mut df = df;
2547 df.rechunk_mut();
2548 Ok(idx
2549 .into_iter()
2550 .map(|(_, group)| {
2551 unsafe {
2553 df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
2554 }
2555 })
2556 .collect())
2557 },
2558 GroupsType::Slice { groups, .. } => Ok(groups
2559 .iter()
2560 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2561 .collect()),
2562 }
2563 }
2564 }
2565
2566 #[cfg(feature = "partition_by")]
2568 pub fn partition_by<I, S>(&self, cols: I, include_key: bool) -> PolarsResult<Vec<DataFrame>>
2569 where
2570 I: IntoIterator<Item = S>,
2571 S: Into<PlSmallStr>,
2572 {
2573 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2574 self._partition_by_impl(cols.as_slice(), false, include_key, true)
2575 }
2576
2577 #[cfg(feature = "partition_by")]
2580 pub fn partition_by_stable<I, S>(
2581 &self,
2582 cols: I,
2583 include_key: bool,
2584 ) -> PolarsResult<Vec<DataFrame>>
2585 where
2586 I: IntoIterator<Item = S>,
2587 S: Into<PlSmallStr>,
2588 {
2589 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2590 self._partition_by_impl(cols.as_slice(), true, include_key, true)
2591 }
2592
2593 #[cfg(feature = "dtype-struct")]
2596 pub fn unnest(
2597 &self,
2598 cols: impl IntoIterator<Item = impl Into<PlSmallStr>>,
2599 separator: Option<&str>,
2600 ) -> PolarsResult<DataFrame> {
2601 self.unnest_impl(cols.into_iter().map(Into::into).collect(), separator)
2602 }
2603
2604 #[cfg(feature = "dtype-struct")]
2605 fn unnest_impl(
2606 &self,
2607 cols: PlHashSet<PlSmallStr>,
2608 separator: Option<&str>,
2609 ) -> PolarsResult<DataFrame> {
2610 let mut new_cols = Vec::with_capacity(std::cmp::min(self.width() * 2, self.width() + 128));
2611 let mut count = 0;
2612 for s in self.columns() {
2613 if cols.contains(s.name()) {
2614 let ca = s.struct_()?.clone();
2615 new_cols.extend(ca.fields_as_series().into_iter().map(|mut f| {
2616 if let Some(separator) = &separator {
2617 f.rename(polars_utils::format_pl_smallstr!(
2618 "{}{}{}",
2619 s.name(),
2620 separator,
2621 f.name()
2622 ));
2623 }
2624 Column::from(f)
2625 }));
2626 count += 1;
2627 } else {
2628 new_cols.push(s.clone())
2629 }
2630 }
2631 if count != cols.len() {
2632 let schema = self.schema();
2635 for col in cols {
2636 let _ = schema
2637 .get(col.as_str())
2638 .ok_or_else(|| polars_err!(col_not_found = col))?;
2639 }
2640 }
2641
2642 DataFrame::new_infer_height(new_cols)
2643 }
2644
2645 pub fn append_record_batch(&mut self, rb: RecordBatchT<ArrayRef>) -> PolarsResult<()> {
2646 let df = DataFrame::from(rb);
2649 polars_ensure!(
2650 self.schema() == df.schema(),
2651 SchemaMismatch: "cannot append record batch with different schema\n\n
2652 Got {:?}\nexpected: {:?}", df.schema(), self.schema(),
2653 );
2654 self.vstack_mut_owned_unchecked(df);
2655 Ok(())
2656 }
2657}
2658
2659pub struct RecordBatchIter<'a> {
2660 df: &'a DataFrame,
2661 schema: ArrowSchemaRef,
2662 idx: usize,
2663 n_chunks: usize,
2664 compat_level: CompatLevel,
2665 parallel: bool,
2666}
2667
2668impl Iterator for RecordBatchIter<'_> {
2669 type Item = RecordBatch;
2670
2671 fn next(&mut self) -> Option<Self::Item> {
2672 if self.idx >= self.n_chunks {
2673 return None;
2674 }
2675
2676 let batch_cols: Vec<ArrayRef> = if self.parallel {
2678 let iter = self
2679 .df
2680 .columns()
2681 .par_iter()
2682 .map(Column::as_materialized_series)
2683 .map(|s| s.to_arrow(self.idx, self.compat_level));
2684 POOL.install(|| iter.collect())
2685 } else {
2686 self.df
2687 .columns()
2688 .iter()
2689 .map(Column::as_materialized_series)
2690 .map(|s| s.to_arrow(self.idx, self.compat_level))
2691 .collect()
2692 };
2693
2694 let length = batch_cols.first().map_or(0, |arr| arr.len());
2695
2696 self.idx += 1;
2697
2698 Some(RecordBatch::new(length, self.schema.clone(), batch_cols))
2699 }
2700
2701 fn size_hint(&self) -> (usize, Option<usize>) {
2702 let n = self.n_chunks - self.idx;
2703 (n, Some(n))
2704 }
2705}
2706
2707pub struct PhysRecordBatchIter<'a> {
2708 schema: ArrowSchemaRef,
2709 arr_iters: Vec<std::slice::Iter<'a, ArrayRef>>,
2710}
2711
2712impl Iterator for PhysRecordBatchIter<'_> {
2713 type Item = RecordBatch;
2714
2715 fn next(&mut self) -> Option<Self::Item> {
2716 let arrs = self
2717 .arr_iters
2718 .iter_mut()
2719 .map(|phys_iter| phys_iter.next().cloned())
2720 .collect::<Option<Vec<_>>>()?;
2721
2722 let length = arrs.first().map_or(0, |arr| arr.len());
2723 Some(RecordBatch::new(length, self.schema.clone(), arrs))
2724 }
2725
2726 fn size_hint(&self) -> (usize, Option<usize>) {
2727 if let Some(iter) = self.arr_iters.first() {
2728 iter.size_hint()
2729 } else {
2730 (0, None)
2731 }
2732 }
2733}
2734
2735pub enum RecordBatchIterWrap<'a> {
2736 ZeroWidth {
2737 remaining_height: usize,
2738 chunk_size: usize,
2739 },
2740 Batches(RecordBatchIter<'a>),
2741 PhysicalBatches(PhysRecordBatchIter<'a>),
2742}
2743
2744impl<'a> RecordBatchIterWrap<'a> {
2745 fn new_zero_width(height: usize) -> Self {
2746 Self::ZeroWidth {
2747 remaining_height: height,
2748 chunk_size: get_ideal_morsel_size().get(),
2749 }
2750 }
2751}
2752
2753impl Iterator for RecordBatchIterWrap<'_> {
2754 type Item = RecordBatch;
2755
2756 fn next(&mut self) -> Option<Self::Item> {
2757 match self {
2758 Self::ZeroWidth {
2759 remaining_height,
2760 chunk_size,
2761 } => {
2762 let n = usize::min(*remaining_height, *chunk_size);
2763 *remaining_height -= n;
2764
2765 (n > 0).then(|| RecordBatch::new(n, ArrowSchemaRef::default(), vec![]))
2766 },
2767 Self::Batches(v) => v.next(),
2768 Self::PhysicalBatches(v) => v.next(),
2769 }
2770 }
2771
2772 fn size_hint(&self) -> (usize, Option<usize>) {
2773 match self {
2774 Self::ZeroWidth {
2775 remaining_height,
2776 chunk_size,
2777 } => {
2778 let n = remaining_height.div_ceil(*chunk_size);
2779 (n, Some(n))
2780 },
2781 Self::Batches(v) => v.size_hint(),
2782 Self::PhysicalBatches(v) => v.size_hint(),
2783 }
2784 }
2785}
2786
2787fn ensure_can_extend(left: &Column, right: &Column) -> PolarsResult<()> {
2789 polars_ensure!(
2790 left.name() == right.name(),
2791 ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}",
2792 left.name(), right.name(),
2793 );
2794 Ok(())
2795}
2796
2797#[cfg(test)]
2798mod test {
2799 use super::*;
2800
2801 fn create_frame() -> DataFrame {
2802 let s0 = Column::new("days".into(), [0, 1, 2].as_ref());
2803 let s1 = Column::new("temp".into(), [22.1, 19.9, 7.].as_ref());
2804 DataFrame::new_infer_height(vec![s0, s1]).unwrap()
2805 }
2806
2807 #[test]
2808 #[cfg_attr(miri, ignore)]
2809 fn test_recordbatch_iterator() {
2810 let df = df!(
2811 "foo" => [1, 2, 3, 4, 5]
2812 )
2813 .unwrap();
2814 let mut iter = df.iter_chunks(CompatLevel::newest(), false);
2815 assert_eq!(5, iter.next().unwrap().len());
2816 assert!(iter.next().is_none());
2817 }
2818
2819 #[test]
2820 #[cfg_attr(miri, ignore)]
2821 fn test_select() {
2822 let df = create_frame();
2823 assert_eq!(
2824 df.column("days")
2825 .unwrap()
2826 .as_series()
2827 .unwrap()
2828 .equal(1)
2829 .unwrap()
2830 .sum(),
2831 Some(1)
2832 );
2833 }
2834
2835 #[test]
2836 #[cfg_attr(miri, ignore)]
2837 fn test_filter_broadcast_on_string_col() {
2838 let col_name = "some_col";
2839 let v = vec!["test".to_string()];
2840 let s0 = Column::new(PlSmallStr::from_str(col_name), v);
2841 let mut df = DataFrame::new_infer_height(vec![s0]).unwrap();
2842
2843 df = df
2844 .filter(
2845 &df.column(col_name)
2846 .unwrap()
2847 .as_materialized_series()
2848 .equal("")
2849 .unwrap(),
2850 )
2851 .unwrap();
2852 assert_eq!(
2853 df.column(col_name)
2854 .unwrap()
2855 .as_materialized_series()
2856 .n_chunks(),
2857 1
2858 );
2859 }
2860
2861 #[test]
2862 #[cfg_attr(miri, ignore)]
2863 fn test_filter_broadcast_on_list_col() {
2864 let s1 = Series::new(PlSmallStr::EMPTY, [true, false, true]);
2865 let ll: ListChunked = [&s1].iter().copied().collect();
2866
2867 let mask = BooleanChunked::from_slice(PlSmallStr::EMPTY, &[false]);
2868 let new = ll.filter(&mask).unwrap();
2869
2870 assert_eq!(new.chunks.len(), 1);
2871 assert_eq!(new.len(), 0);
2872 }
2873
2874 #[test]
2875 fn slice() {
2876 let df = create_frame();
2877 let sliced_df = df.slice(0, 2);
2878 assert_eq!(sliced_df.shape(), (2, 2));
2879 }
2880
2881 #[test]
2882 fn rechunk_false() {
2883 let df = create_frame();
2884 assert!(!df.should_rechunk())
2885 }
2886
2887 #[test]
2888 fn rechunk_true() -> PolarsResult<()> {
2889 let mut base = df!(
2890 "a" => [1, 2, 3],
2891 "b" => [1, 2, 3]
2892 )?;
2893
2894 let mut s = Series::new("foo".into(), 0..2);
2896 let s2 = Series::new("bar".into(), 0..1);
2897 s.append(&s2)?;
2898
2899 let out = base.with_column(s.into_column())?;
2901
2902 assert!(out.should_rechunk());
2904 Ok(())
2905 }
2906
2907 #[test]
2908 fn test_duplicate_column() {
2909 let mut df = df! {
2910 "foo" => [1, 2, 3]
2911 }
2912 .unwrap();
2913 assert!(
2915 df.with_column(Column::new("foo".into(), &[1, 2, 3]))
2916 .is_ok()
2917 );
2918 assert!(
2919 df.with_column(Column::new("bar".into(), &[1, 2, 3]))
2920 .is_ok()
2921 );
2922 assert!(df.column("bar").is_ok())
2923 }
2924
2925 #[test]
2926 #[cfg_attr(miri, ignore)]
2927 fn distinct() {
2928 let df = df! {
2929 "flt" => [1., 1., 2., 2., 3., 3.],
2930 "int" => [1, 1, 2, 2, 3, 3, ],
2931 "str" => ["a", "a", "b", "b", "c", "c"]
2932 }
2933 .unwrap();
2934 let df = df
2935 .unique_stable(None, UniqueKeepStrategy::First, None)
2936 .unwrap()
2937 .sort(["flt"], SortMultipleOptions::default())
2938 .unwrap();
2939 let valid = df! {
2940 "flt" => [1., 2., 3.],
2941 "int" => [1, 2, 3],
2942 "str" => ["a", "b", "c"]
2943 }
2944 .unwrap();
2945 assert!(df.equals(&valid));
2946 }
2947
2948 #[test]
2949 fn test_vstack() {
2950 let mut df = df! {
2952 "flt" => [1., 1., 2., 2., 3., 3.],
2953 "int" => [1, 1, 2, 2, 3, 3, ],
2954 "str" => ["a", "a", "b", "b", "c", "c"]
2955 }
2956 .unwrap();
2957
2958 df.vstack_mut(&df.slice(0, 3)).unwrap();
2959 assert_eq!(df.first_col_n_chunks(), 2)
2960 }
2961
2962 #[test]
2963 fn test_vstack_on_empty_dataframe() {
2964 let mut df = DataFrame::empty();
2965
2966 let df_data = df! {
2967 "flt" => [1., 1., 2., 2., 3., 3.],
2968 "int" => [1, 1, 2, 2, 3, 3, ],
2969 "str" => ["a", "a", "b", "b", "c", "c"]
2970 }
2971 .unwrap();
2972
2973 df.vstack_mut(&df_data).unwrap();
2974 assert_eq!(df.height(), 6)
2975 }
2976
2977 #[test]
2978 fn test_unique_keep_none_with_slice() {
2979 let df = df! {
2980 "x" => [1, 2, 3, 2, 1]
2981 }
2982 .unwrap();
2983 let out = df
2984 .unique_stable(
2985 Some(&["x".to_string()][..]),
2986 UniqueKeepStrategy::None,
2987 Some((0, 2)),
2988 )
2989 .unwrap();
2990 let expected = df! {
2991 "x" => [3]
2992 }
2993 .unwrap();
2994 assert!(out.equals(&expected));
2995 }
2996
2997 #[test]
2998 #[cfg(feature = "dtype-i8")]
2999 fn test_apply_result_schema() {
3000 let mut df = df! {
3001 "x" => [1, 2, 3, 2, 1]
3002 }
3003 .unwrap();
3004
3005 let schema_before = df.schema().clone();
3006 df.apply("x", |f| f.cast(&DataType::Int8).unwrap()).unwrap();
3007 assert_ne!(&schema_before, df.schema());
3008 }
3009}