1#![allow(unsafe_op_in_unsafe_fn)]
2use arrow::datatypes::ArrowSchemaRef;
4use polars_row::ArrayRef;
5use polars_utils::UnitVec;
6use polars_utils::ideal_morsel_size::get_ideal_morsel_size;
7use polars_utils::itertools::Itertools;
8use rayon::prelude::*;
9
10use crate::chunked_array::flags::StatisticsFlags;
11#[cfg(feature = "algorithm_group_by")]
12use crate::chunked_array::ops::unique::is_unique_helper;
13use crate::prelude::gather::check_bounds_ca;
14use crate::prelude::*;
15#[cfg(feature = "row_hash")]
16use crate::utils::split_df;
17use crate::utils::{Container, NoNull, slice_offsets, try_get_supertype};
18use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
19
20#[cfg(feature = "dataframe_arithmetic")]
21mod arithmetic;
22pub mod builder;
23mod chunks;
24pub use chunks::chunk_df_for_writing;
25mod broadcast;
26pub mod column;
27mod dataframe;
28mod filter;
29mod projection;
30pub use dataframe::DataFrame;
31use filter::filter_zero_width;
32use projection::{AmortizedColumnSelector, LINEAR_SEARCH_LIMIT};
33
34pub mod explode;
35mod from;
36#[cfg(feature = "algorithm_group_by")]
37pub mod group_by;
38pub(crate) mod horizontal;
39#[cfg(feature = "proptest")]
40pub mod proptest;
41#[cfg(any(feature = "rows", feature = "object"))]
42pub mod row;
43mod top_k;
44mod upstream_traits;
45mod validation;
46
47use arrow::record_batch::{RecordBatch, RecordBatchT};
48use polars_utils::pl_str::PlSmallStr;
49#[cfg(feature = "serde")]
50use serde::{Deserialize, Serialize};
51use strum_macros::IntoStaticStr;
52
53use crate::POOL;
54#[cfg(feature = "row_hash")]
55use crate::hashing::_df_rows_to_hashes_threaded_vertical;
56use crate::prelude::sort::arg_sort;
57use crate::series::IsSorted;
58
59#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
60#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
61#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
62#[strum(serialize_all = "snake_case")]
63pub enum UniqueKeepStrategy {
64 First,
66 Last,
68 None,
70 #[default]
73 Any,
74}
75
76impl DataFrame {
77 pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
78 self.columns().iter().map(Column::as_materialized_series)
79 }
80
81 pub fn estimated_size(&self) -> usize {
94 self.columns().iter().map(Column::estimated_size).sum()
95 }
96
97 pub fn try_apply_columns(
98 &self,
99 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
100 ) -> PolarsResult<Vec<Column>> {
101 return inner(self, &func);
102
103 fn inner(
104 slf: &DataFrame,
105 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
106 ) -> PolarsResult<Vec<Column>> {
107 slf.columns().iter().map(func).collect()
108 }
109 }
110
111 pub fn apply_columns(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
112 return inner(self, &func);
113
114 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
115 slf.columns().iter().map(func).collect()
116 }
117 }
118
119 pub fn try_apply_columns_par(
120 &self,
121 func: impl Fn(&Column) -> PolarsResult<Column> + Send + Sync,
122 ) -> PolarsResult<Vec<Column>> {
123 return inner(self, &func);
124
125 fn inner(
126 slf: &DataFrame,
127 func: &(dyn Fn(&Column) -> PolarsResult<Column> + Send + Sync),
128 ) -> PolarsResult<Vec<Column>> {
129 POOL.install(|| slf.columns().par_iter().map(func).collect())
130 }
131 }
132
133 pub fn apply_columns_par(&self, func: impl Fn(&Column) -> Column + Send + Sync) -> Vec<Column> {
134 return inner(self, &func);
135
136 fn inner(slf: &DataFrame, func: &(dyn Fn(&Column) -> Column + Send + Sync)) -> Vec<Column> {
137 POOL.install(|| slf.columns().par_iter().map(func).collect())
138 }
139 }
140
141 pub(crate) fn reserve_chunks(&mut self, additional: usize) {
143 for s in unsafe { self.columns_mut_retain_schema() } {
144 if let Column::Series(s) = s {
145 unsafe { s.chunks_mut().reserve(additional) }
148 }
149 }
150 }
151 pub fn new_from_index(&self, index: usize, height: usize) -> Self {
152 let new_cols = self.apply_columns(|c| c.new_from_index(index, height));
153
154 unsafe { Self::_new_unchecked_impl(height, new_cols).with_schema_from(self) }
155 }
156
157 pub fn full_null(schema: &Schema, height: usize) -> Self {
159 let columns = schema
160 .iter_fields()
161 .map(|f| Column::full_null(f.name().clone(), height, f.dtype()))
162 .collect();
163
164 unsafe { DataFrame::_new_unchecked_impl(height, columns) }
165 }
166
167 pub fn ensure_matches_schema(&mut self, schema: &Schema) -> PolarsResult<()> {
170 let mut did_cast = false;
171 let cached_schema = self.cached_schema().cloned();
172
173 for (col, (name, dt)) in unsafe { self.columns_mut() }.iter_mut().zip(schema.iter()) {
174 polars_ensure!(
175 col.name() == name,
176 SchemaMismatch: "column name mismatch: expected {:?}, found {:?}",
177 name,
178 col.name()
179 );
180
181 let needs_cast = col.dtype().matches_schema_type(dt)?;
182
183 if needs_cast {
184 *col = col.cast(dt)?;
185 did_cast = true;
186 }
187 }
188
189 if !did_cast {
190 unsafe { self.set_opt_schema(cached_schema) };
191 }
192
193 Ok(())
194 }
195
196 pub fn with_row_index(&self, name: PlSmallStr, offset: Option<IdxSize>) -> PolarsResult<Self> {
231 let mut new_columns = Vec::with_capacity(self.width() + 1);
232 let offset = offset.unwrap_or(0);
233
234 if self.get_column_index(&name).is_some() {
235 polars_bail!(duplicate = name)
236 }
237
238 let col = Column::new_row_index(name, offset, self.height())?;
239 new_columns.push(col);
240 new_columns.extend_from_slice(self.columns());
241
242 Ok(unsafe { DataFrame::new_unchecked(self.height(), new_columns) })
243 }
244
245 pub unsafe fn with_row_index_mut(
253 &mut self,
254 name: PlSmallStr,
255 offset: Option<IdxSize>,
256 ) -> &mut Self {
257 debug_assert!(
258 self.get_column_index(&name).is_none(),
259 "with_row_index_mut(): column with name {} already exists",
260 &name
261 );
262
263 let offset = offset.unwrap_or(0);
264 let col = Column::new_row_index(name, offset, self.height()).unwrap();
265
266 unsafe { self.columns_mut() }.insert(0, col);
267 self
268 }
269
270 pub fn shrink_to_fit(&mut self) {
272 for s in unsafe { self.columns_mut_retain_schema() } {
274 s.shrink_to_fit();
275 }
276 }
277
278 pub fn rechunk_mut_par(&mut self) -> &mut Self {
281 if self.columns().iter().any(|c| c.n_chunks() > 1) {
282 POOL.install(|| {
283 unsafe { self.columns_mut_retain_schema() }
284 .par_iter_mut()
285 .for_each(|c| *c = c.rechunk());
286 })
287 }
288
289 self
290 }
291
292 pub fn rechunk_mut(&mut self) -> &mut Self {
294 let columns = unsafe { self.columns_mut() };
296
297 for col in columns.iter_mut().filter(|c| c.n_chunks() > 1) {
298 *col = col.rechunk();
299 }
300
301 self
302 }
303
304 pub fn should_rechunk(&self) -> bool {
306 if !self
309 .columns()
310 .iter()
311 .filter_map(|c| c.as_series().map(|s| s.n_chunks()))
312 .all_equal()
313 {
314 return true;
315 }
316
317 let mut chunk_lengths = self.materialized_column_iter().map(|s| s.chunk_lengths());
319 match chunk_lengths.next() {
320 None => false,
321 Some(first_column_chunk_lengths) => {
322 if first_column_chunk_lengths.size_hint().0 == 1 {
324 return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
325 }
326 let height = self.height();
329 let n_chunks = first_column_chunk_lengths.size_hint().0;
330 if n_chunks > height && !(height == 0 && n_chunks == 1) {
331 return true;
332 }
333 let v: Vec<_> = first_column_chunk_lengths.collect();
335 for cl in chunk_lengths {
336 if cl.enumerate().any(|(idx, el)| Some(&el) != v.get(idx)) {
337 return true;
338 }
339 }
340 false
341 },
342 }
343 }
344
345 pub fn align_chunks_par(&mut self) -> &mut Self {
347 if self.should_rechunk() {
348 self.rechunk_mut_par()
349 } else {
350 self
351 }
352 }
353
354 pub fn align_chunks(&mut self) -> &mut Self {
356 if self.should_rechunk() {
357 self.rechunk_mut()
358 } else {
359 self
360 }
361 }
362
363 pub fn get_column_names(&self) -> Vec<&PlSmallStr> {
374 self.columns().iter().map(|s| s.name()).collect()
375 }
376
377 pub fn get_column_names_owned(&self) -> Vec<PlSmallStr> {
379 self.columns().iter().map(|s| s.name().clone()).collect()
380 }
381
382 pub fn set_column_names<T>(&mut self, new_names: &[T]) -> PolarsResult<()>
394 where
395 T: AsRef<str>,
396 {
397 polars_ensure!(
398 new_names.len() == self.width(),
399 ShapeMismatch: "{} column names provided for a DataFrame of width {}",
400 new_names.len(), self.width()
401 );
402
403 validation::ensure_names_unique(new_names)?;
404
405 *unsafe { self.columns_mut() } = std::mem::take(unsafe { self.columns_mut() })
406 .into_iter()
407 .zip(new_names)
408 .map(|(c, name)| c.with_name(PlSmallStr::from_str(name.as_ref())))
409 .collect();
410
411 Ok(())
412 }
413
414 pub fn dtypes(&self) -> Vec<DataType> {
427 self.columns().iter().map(|s| s.dtype().clone()).collect()
428 }
429
430 pub fn first_col_n_chunks(&self) -> usize {
432 match self.columns().iter().find_map(|col| col.as_series()) {
433 None if self.width() == 0 => 0,
434 None => 1,
435 Some(s) => s.n_chunks(),
436 }
437 }
438
439 pub fn max_n_chunks(&self) -> usize {
441 self.columns()
442 .iter()
443 .map(|s| s.as_series().map(|s| s.n_chunks()).unwrap_or(1))
444 .max()
445 .unwrap_or(0)
446 }
447
448 pub fn fields(&self) -> Vec<Field> {
464 self.columns()
465 .iter()
466 .map(|s| s.field().into_owned())
467 .collect()
468 }
469
470 pub fn hstack(&self, columns: &[Column]) -> PolarsResult<Self> {
504 let mut new_cols = Vec::with_capacity(self.width() + columns.len());
505
506 new_cols.extend(self.columns().iter().cloned());
507 new_cols.extend_from_slice(columns);
508
509 DataFrame::new(self.height(), new_cols)
510 }
511 pub fn vstack(&self, other: &DataFrame) -> PolarsResult<Self> {
552 let mut df = self.clone();
553 df.vstack_mut(other)?;
554 Ok(df)
555 }
556
557 pub fn vstack_mut(&mut self, other: &DataFrame) -> PolarsResult<&mut Self> {
598 if self.width() != other.width() {
599 polars_ensure!(
600 self.shape() == (0, 0),
601 ShapeMismatch:
602 "unable to append to a DataFrame of shape {:?} with a DataFrame of width {}",
603 self.shape(), other.width(),
604 );
605
606 self.clone_from(other);
607
608 return Ok(self);
609 }
610
611 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
612
613 unsafe { self.columns_mut_retain_schema() }
614 .iter_mut()
615 .zip(other.columns())
616 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
617 ensure_can_extend(&*left, right)?;
618 left.append(right).map_err(|e| {
619 e.context(format!("failed to vstack column '{}'", right.name()).into())
620 })?;
621 Ok(())
622 })?;
623
624 unsafe { self.set_height(new_height) };
625
626 Ok(self)
627 }
628
629 pub fn vstack_mut_owned(&mut self, other: DataFrame) -> PolarsResult<&mut Self> {
630 if self.width() != other.width() {
631 polars_ensure!(
632 self.shape() == (0, 0),
633 ShapeMismatch:
634 "unable to append to a DataFrame of width {} with a DataFrame of width {}",
635 self.width(), other.width(),
636 );
637
638 *self = other;
639
640 return Ok(self);
641 }
642
643 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
644
645 unsafe { self.columns_mut_retain_schema() }
646 .iter_mut()
647 .zip(other.into_columns())
648 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
649 ensure_can_extend(&*left, &right)?;
650 let right_name = right.name().clone();
651 left.append_owned(right).map_err(|e| {
652 e.context(format!("failed to vstack column '{right_name}'").into())
653 })?;
654 Ok(())
655 })?;
656
657 unsafe { self.set_height(new_height) };
658
659 Ok(self)
660 }
661
662 pub fn vstack_mut_unchecked(&mut self, other: &DataFrame) -> &mut Self {
669 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
670
671 unsafe { self.columns_mut_retain_schema() }
672 .iter_mut()
673 .zip(other.columns())
674 .for_each(|(left, right)| {
675 left.append(right)
676 .map_err(|e| {
677 e.context(format!("failed to vstack column '{}'", right.name()).into())
678 })
679 .expect("should not fail");
680 });
681
682 unsafe { self.set_height(new_height) };
683
684 self
685 }
686
687 pub fn vstack_mut_owned_unchecked(&mut self, other: DataFrame) -> &mut Self {
694 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
695
696 unsafe { self.columns_mut_retain_schema() }
697 .iter_mut()
698 .zip(other.into_columns())
699 .for_each(|(left, right)| {
700 left.append_owned(right).expect("should not fail");
701 });
702
703 unsafe { self.set_height(new_height) };
704
705 self
706 }
707
708 pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
723 polars_ensure!(
724 self.width() == other.width(),
725 ShapeMismatch:
726 "unable to extend a DataFrame of width {} with a DataFrame of width {}",
727 self.width(), other.width(),
728 );
729
730 let new_height = usize::checked_add(self.height(), other.height()).unwrap();
731
732 unsafe { self.columns_mut_retain_schema() }
733 .iter_mut()
734 .zip(other.columns())
735 .try_for_each::<_, PolarsResult<_>>(|(left, right)| {
736 ensure_can_extend(&*left, right)?;
737 left.extend(right).map_err(|e| {
738 e.context(format!("failed to extend column '{}'", right.name()).into())
739 })?;
740 Ok(())
741 })?;
742
743 unsafe { self.set_height(new_height) };
744
745 Ok(())
746 }
747
748 pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
765 let idx = self.try_get_column_index(name)?;
766 Ok(unsafe { self.columns_mut() }.remove(idx))
767 }
768
769 pub fn drop_nulls<S>(&self, subset: Option<&[S]>) -> PolarsResult<Self>
798 where
799 for<'a> &'a S: AsRef<str>,
800 {
801 if let Some(v) = subset {
802 let v = self.select_to_vec(v)?;
803 self._drop_nulls_impl(v.as_slice())
804 } else {
805 self._drop_nulls_impl(self.columns())
806 }
807 }
808
809 fn _drop_nulls_impl(&self, subset: &[Column]) -> PolarsResult<Self> {
810 if subset.iter().all(|s| !s.has_nulls()) {
812 return Ok(self.clone());
813 }
814
815 let mut iter = subset.iter();
816
817 let mask = iter
818 .next()
819 .ok_or_else(|| polars_err!(NoData: "no data to drop nulls from"))?;
820 let mut mask = mask.is_not_null();
821
822 for c in iter {
823 mask = mask & c.is_not_null();
824 }
825 self.filter(&mask)
826 }
827
828 pub fn drop(&self, name: &str) -> PolarsResult<Self> {
843 let idx = self.try_get_column_index(name)?;
844 let mut new_cols = Vec::with_capacity(self.width() - 1);
845
846 self.columns().iter().enumerate().for_each(|(i, s)| {
847 if i != idx {
848 new_cols.push(s.clone())
849 }
850 });
851
852 Ok(unsafe { DataFrame::_new_unchecked_impl(self.height(), new_cols) })
853 }
854
855 pub fn drop_many<I, S>(&self, names: I) -> Self
857 where
858 I: IntoIterator<Item = S>,
859 S: Into<PlSmallStr>,
860 {
861 let names: PlHashSet<PlSmallStr> = names.into_iter().map(|s| s.into()).collect();
862 self.drop_many_amortized(&names)
863 }
864
865 pub fn drop_many_amortized(&self, names: &PlHashSet<PlSmallStr>) -> DataFrame {
867 if names.is_empty() {
868 return self.clone();
869 }
870 let mut new_cols = Vec::with_capacity(self.width().saturating_sub(names.len()));
871 self.columns().iter().for_each(|s| {
872 if !names.contains(s.name()) {
873 new_cols.push(s.clone())
874 }
875 });
876
877 unsafe { DataFrame::new_unchecked(self.height(), new_cols) }
878 }
879
880 fn insert_column_no_namecheck(
883 &mut self,
884 index: usize,
885 column: Column,
886 ) -> PolarsResult<&mut Self> {
887 if self.shape() == (0, 0) {
888 unsafe { self.set_height(column.len()) };
889 }
890
891 polars_ensure!(
892 column.len() == self.height(),
893 ShapeMismatch:
894 "unable to add a column of length {} to a DataFrame of height {}",
895 column.len(), self.height(),
896 );
897
898 unsafe { self.columns_mut() }.insert(index, column);
899 Ok(self)
900 }
901
902 pub fn insert_column(&mut self, index: usize, column: Column) -> PolarsResult<&mut Self> {
904 let name = column.name();
905
906 polars_ensure!(
907 self.get_column_index(name).is_none(),
908 Duplicate:
909 "column with name {:?} is already present in the DataFrame", name
910 );
911
912 self.insert_column_no_namecheck(index, column)
913 }
914
915 pub fn with_column(&mut self, mut column: Column) -> PolarsResult<&mut Self> {
918 if self.shape() == (0, 0) {
919 unsafe { self.set_height(column.len()) };
920 }
921
922 if column.len() != self.height() && column.len() == 1 {
923 column = column.new_from_index(0, self.height());
924 }
925
926 polars_ensure!(
927 column.len() == self.height(),
928 ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
929 column.len(), self.height(),
930 );
931
932 if let Some(i) = self.get_column_index(column.name()) {
933 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
934 } else {
935 unsafe { self.columns_mut() }.push(column)
936 };
937
938 Ok(self)
939 }
940
941 pub unsafe fn push_column_unchecked(&mut self, column: Column) -> &mut Self {
947 unsafe { self.columns_mut() }.push(column);
948 self
949 }
950
951 pub fn with_columns_mut(
954 &mut self,
955 columns: impl IntoIterator<Item = Column>,
956 output_schema: &Schema,
957 ) -> PolarsResult<()> {
958 let columns = columns.into_iter();
959
960 unsafe {
961 self.columns_mut_retain_schema()
962 .reserve(columns.size_hint().0)
963 }
964
965 for c in columns {
966 self.with_column_and_schema_mut(c, output_schema)?;
967 }
968
969 Ok(())
970 }
971
972 fn with_column_and_schema_mut(
973 &mut self,
974 mut column: Column,
975 output_schema: &Schema,
976 ) -> PolarsResult<&mut Self> {
977 if self.shape() == (0, 0) {
978 unsafe { self.set_height(column.len()) };
979 }
980
981 if column.len() != self.height() && column.len() == 1 {
982 column = column.new_from_index(0, self.height());
983 }
984
985 polars_ensure!(
986 column.len() == self.height(),
987 ShapeMismatch:
988 "unable to add a column of length {} to a DataFrame of height {}",
989 column.len(), self.height(),
990 );
991
992 let i = output_schema
993 .index_of(column.name())
994 .or_else(|| self.get_column_index(column.name()))
995 .unwrap_or(self.width());
996
997 if i < self.width() {
998 *unsafe { self.columns_mut() }.get_mut(i).unwrap() = column
999 } else if i == self.width() {
1000 unsafe { self.columns_mut() }.push(column)
1001 } else {
1002 panic!()
1004 }
1005
1006 Ok(self)
1007 }
1008
1009 pub fn get(&self, idx: usize) -> Option<Vec<AnyValue<'_>>> {
1020 (idx < self.height()).then(|| self.columns().iter().map(|c| c.get(idx).unwrap()).collect())
1021 }
1022
1023 pub fn select_at_idx(&self, idx: usize) -> Option<&Column> {
1039 self.columns().get(idx)
1040 }
1041
1042 pub fn get_column_index(&self, name: &str) -> Option<usize> {
1060 if let Some(schema) = self.cached_schema() {
1061 schema.index_of(name)
1062 } else if self.width() <= LINEAR_SEARCH_LIMIT {
1063 self.columns().iter().position(|s| s.name() == name)
1064 } else {
1065 self.schema().index_of(name)
1066 }
1067 }
1068
1069 pub fn try_get_column_index(&self, name: &str) -> PolarsResult<usize> {
1071 self.get_column_index(name)
1072 .ok_or_else(|| polars_err!(col_not_found = name))
1073 }
1074
1075 pub fn column(&self, name: &str) -> PolarsResult<&Column> {
1089 let idx = self.try_get_column_index(name)?;
1090 Ok(self.select_at_idx(idx).unwrap())
1091 }
1092
1093 pub fn select<I, S>(&self, names: I) -> PolarsResult<Self>
1104 where
1105 I: IntoIterator<Item = S>,
1106 S: AsRef<str>,
1107 {
1108 DataFrame::new(self.height(), self.select_to_vec(names)?)
1109 }
1110
1111 pub unsafe fn select_unchecked<I, S>(&self, names: I) -> PolarsResult<Self>
1116 where
1117 I: IntoIterator<Item = S>,
1118 S: AsRef<str>,
1119 {
1120 Ok(unsafe { DataFrame::new_unchecked(self.height(), self.select_to_vec(names)?) })
1121 }
1122
1123 pub fn select_to_vec(
1141 &self,
1142 selection: impl IntoIterator<Item = impl AsRef<str>>,
1143 ) -> PolarsResult<Vec<Column>> {
1144 AmortizedColumnSelector::new(self).select_multiple(selection)
1145 }
1146
1147 pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1159 if self.width() == 0 {
1160 filter_zero_width(self.height(), mask)
1161 } else {
1162 let new_columns: Vec<Column> = self.try_apply_columns_par(|s| s.filter(mask))?;
1163 let out = unsafe {
1164 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1165 };
1166
1167 Ok(out)
1168 }
1169 }
1170
1171 pub fn filter_seq(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
1173 if self.width() == 0 {
1174 filter_zero_width(self.height(), mask)
1175 } else {
1176 let new_columns: Vec<Column> = self.try_apply_columns(|s| s.filter(mask))?;
1177 let out = unsafe {
1178 DataFrame::new_unchecked(new_columns[0].len(), new_columns).with_schema_from(self)
1179 };
1180
1181 Ok(out)
1182 }
1183 }
1184
1185 pub fn take(&self, indices: &IdxCa) -> PolarsResult<Self> {
1197 check_bounds_ca(indices, self.height().try_into().unwrap_or(IdxSize::MAX))?;
1198
1199 let new_cols = self.apply_columns_par(|c| {
1200 assert_eq!(c.len(), self.height());
1201 unsafe { c.take_unchecked(indices) }
1202 });
1203
1204 Ok(unsafe { DataFrame::new_unchecked(indices.len(), new_cols).with_schema_from(self) })
1205 }
1206
1207 pub unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
1210 self.take_unchecked_impl(idx, true)
1211 }
1212
1213 pub unsafe fn gather_group_unchecked(&self, group: &GroupsIndicator) -> Self {
1216 match group {
1217 GroupsIndicator::Idx((_, indices)) => unsafe {
1218 self.take_slice_unchecked_impl(indices.as_slice(), false)
1219 },
1220 GroupsIndicator::Slice([offset, len]) => self.slice(*offset as i64, *len as usize),
1221 }
1222 }
1223
1224 pub unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
1227 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1228 POOL.install(|| {
1229 if POOL.current_num_threads() > self.width() {
1230 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1231 if self.height() / stride >= 2 {
1232 self.apply_columns_par(|c| {
1233 let c = if c.dtype().is_nested() {
1236 &c.rechunk()
1237 } else {
1238 c
1239 };
1240
1241 (0..idx.len().div_ceil(stride))
1242 .into_par_iter()
1243 .map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
1244 .reduce(
1245 || Column::new_empty(c.name().clone(), c.dtype()),
1246 |mut a, b| {
1247 a.append_owned(b).unwrap();
1248 a
1249 },
1250 )
1251 })
1252 } else {
1253 self.apply_columns_par(|c| c.take_unchecked(idx))
1254 }
1255 } else {
1256 self.apply_columns_par(|c| c.take_unchecked(idx))
1257 }
1258 })
1259 } else {
1260 self.apply_columns(|s| s.take_unchecked(idx))
1261 };
1262
1263 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1264 }
1265
1266 pub unsafe fn take_slice_unchecked(&self, idx: &[IdxSize]) -> Self {
1269 self.take_slice_unchecked_impl(idx, true)
1270 }
1271
1272 pub unsafe fn take_slice_unchecked_impl(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
1275 let cols = if allow_threads && POOL.current_num_threads() > 1 {
1276 POOL.install(|| {
1277 if POOL.current_num_threads() > self.width() {
1278 let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
1279 if self.height() / stride >= 2 {
1280 self.apply_columns_par(|c| {
1281 let c = if c.dtype().is_nested() {
1284 &c.rechunk()
1285 } else {
1286 c
1287 };
1288
1289 (0..idx.len().div_ceil(stride))
1290 .into_par_iter()
1291 .map(|i| {
1292 let idx = &idx[i * stride..];
1293 let idx = &idx[..idx.len().min(stride)];
1294 c.take_slice_unchecked(idx)
1295 })
1296 .reduce(
1297 || Column::new_empty(c.name().clone(), c.dtype()),
1298 |mut a, b| {
1299 a.append_owned(b).unwrap();
1300 a
1301 },
1302 )
1303 })
1304 } else {
1305 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1306 }
1307 } else {
1308 self.apply_columns_par(|s| s.take_slice_unchecked(idx))
1309 }
1310 })
1311 } else {
1312 self.apply_columns(|s| s.take_slice_unchecked(idx))
1313 };
1314 unsafe { DataFrame::new_unchecked(idx.len(), cols).with_schema_from(self) }
1315 }
1316
1317 pub fn rename(&mut self, column: &str, name: PlSmallStr) -> PolarsResult<&mut Self> {
1332 if column == name.as_str() {
1333 return Ok(self);
1334 }
1335 polars_ensure!(
1336 !self.schema().contains(&name),
1337 Duplicate: "column rename attempted with already existing name \"{name}\""
1338 );
1339
1340 self.get_column_index(column)
1341 .and_then(|idx| unsafe { self.columns_mut() }.get_mut(idx))
1342 .ok_or_else(|| polars_err!(col_not_found = column))
1343 .map(|c| c.rename(name))?;
1344
1345 Ok(self)
1346 }
1347
1348 pub fn rename_many<'a>(
1349 &mut self,
1350 renames: impl Iterator<Item = (&'a str, PlSmallStr)>,
1351 ) -> PolarsResult<&mut Self> {
1352 let mut schema_arc = self.schema().clone();
1353 let schema = Arc::make_mut(&mut schema_arc);
1354
1355 for (from, to) in renames {
1356 if from == to.as_str() {
1357 continue;
1358 }
1359
1360 polars_ensure!(
1361 !schema.contains(&to),
1362 Duplicate: "column rename attempted with already existing name \"{to}\""
1363 );
1364
1365 match schema.get_full(from) {
1366 None => polars_bail!(col_not_found = from),
1367 Some((idx, _, _)) => {
1368 let (n, _) = schema.get_at_index_mut(idx).unwrap();
1369 *n = to.clone();
1370 unsafe { self.columns_mut() }
1371 .get_mut(idx)
1372 .unwrap()
1373 .rename(to);
1374 },
1375 }
1376 }
1377
1378 unsafe { self.set_schema(schema_arc) };
1379
1380 Ok(self)
1381 }
1382
1383 pub fn sort_in_place(
1387 &mut self,
1388 by: impl IntoIterator<Item = impl AsRef<str>>,
1389 sort_options: SortMultipleOptions,
1390 ) -> PolarsResult<&mut Self> {
1391 let by_column = self.select_to_vec(by)?;
1392
1393 let mut out = self.sort_impl(by_column, sort_options, None)?;
1394 unsafe { out.set_schema_from(self) };
1395
1396 *self = out;
1397
1398 Ok(self)
1399 }
1400
1401 #[doc(hidden)]
1402 pub fn sort_impl(
1404 &self,
1405 by_column: Vec<Column>,
1406 sort_options: SortMultipleOptions,
1407 slice: Option<(i64, usize)>,
1408 ) -> PolarsResult<Self> {
1409 if by_column.is_empty() {
1410 return if let Some((offset, len)) = slice {
1412 Ok(self.slice(offset, len))
1413 } else {
1414 Ok(self.clone())
1415 };
1416 }
1417
1418 let first_descending = sort_options.descending[0];
1423 let first_by_column = by_column[0].name().to_string();
1424
1425 let set_sorted = |df: &mut DataFrame| {
1426 let _ = df.apply(&first_by_column, |s| {
1429 let mut s = s.clone();
1430 if first_descending {
1431 s.set_sorted_flag(IsSorted::Descending)
1432 } else {
1433 s.set_sorted_flag(IsSorted::Ascending)
1434 }
1435 s
1436 });
1437 };
1438
1439 if self.shape_has_zero() {
1440 let mut out = self.clone();
1441 set_sorted(&mut out);
1442 return Ok(out);
1443 }
1444
1445 if let Some((0, k)) = slice {
1446 if k < self.height() {
1447 return self.bottom_k_impl(k, by_column, sort_options);
1448 }
1449 }
1450 #[cfg(feature = "dtype-categorical")]
1454 let is_not_categorical_enum =
1455 !(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
1456 || matches!(by_column[0].dtype(), DataType::Enum(_, _)));
1457
1458 #[cfg(not(feature = "dtype-categorical"))]
1459 #[allow(non_upper_case_globals)]
1460 const is_not_categorical_enum: bool = true;
1461
1462 if by_column.len() == 1 && is_not_categorical_enum {
1463 let required_sorting = if sort_options.descending[0] {
1464 IsSorted::Descending
1465 } else {
1466 IsSorted::Ascending
1467 };
1468 let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
1471 && ((by_column[0].null_count() == 0)
1472 || by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
1473 == sort_options.nulls_last[0]);
1474
1475 if no_sorting_required {
1476 return if let Some((offset, len)) = slice {
1477 Ok(self.slice(offset, len))
1478 } else {
1479 Ok(self.clone())
1480 };
1481 }
1482 }
1483
1484 let has_nested = by_column.iter().any(|s| s.dtype().is_nested());
1485 let allow_threads = sort_options.multithreaded;
1486
1487 let mut df = self.clone();
1489 let df = df.rechunk_mut_par();
1490 let mut take = match (by_column.len(), has_nested) {
1491 (1, false) => {
1492 let s = &by_column[0];
1493 let options = SortOptions {
1494 descending: sort_options.descending[0],
1495 nulls_last: sort_options.nulls_last[0],
1496 multithreaded: sort_options.multithreaded,
1497 maintain_order: sort_options.maintain_order,
1498 limit: sort_options.limit,
1499 };
1500 if df.width() == 1 && df.try_get_column_index(s.name().as_str()).is_ok() {
1504 let mut out = s.sort_with(options)?;
1505 if let Some((offset, len)) = slice {
1506 out = out.slice(offset, len);
1507 }
1508 return Ok(out.into_frame());
1509 }
1510 s.arg_sort(options)
1511 },
1512 _ => arg_sort(&by_column, sort_options)?,
1513 };
1514
1515 if let Some((offset, len)) = slice {
1516 take = take.slice(offset, len);
1517 }
1518
1519 let mut df = unsafe { df.take_unchecked_impl(&take, allow_threads) };
1522 set_sorted(&mut df);
1523 Ok(df)
1524 }
1525
1526 pub fn _to_metadata(&self) -> DataFrame {
1531 let num_columns = self.width();
1532
1533 let mut column_names =
1534 StringChunkedBuilder::new(PlSmallStr::from_static("column_name"), num_columns);
1535 let mut repr_ca = StringChunkedBuilder::new(PlSmallStr::from_static("repr"), num_columns);
1536 let mut sorted_asc_ca =
1537 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_asc"), num_columns);
1538 let mut sorted_dsc_ca =
1539 BooleanChunkedBuilder::new(PlSmallStr::from_static("sorted_dsc"), num_columns);
1540 let mut fast_explode_list_ca =
1541 BooleanChunkedBuilder::new(PlSmallStr::from_static("fast_explode_list"), num_columns);
1542 let mut materialized_at_ca =
1543 StringChunkedBuilder::new(PlSmallStr::from_static("materialized_at"), num_columns);
1544
1545 for col in self.columns() {
1546 let flags = col.get_flags();
1547
1548 let (repr, materialized_at) = match col {
1549 Column::Series(s) => ("series", s.materialized_at()),
1550 Column::Scalar(_) => ("scalar", None),
1551 };
1552 let sorted_asc = flags.contains(StatisticsFlags::IS_SORTED_ASC);
1553 let sorted_dsc = flags.contains(StatisticsFlags::IS_SORTED_DSC);
1554 let fast_explode_list = flags.contains(StatisticsFlags::CAN_FAST_EXPLODE_LIST);
1555
1556 column_names.append_value(col.name().clone());
1557 repr_ca.append_value(repr);
1558 sorted_asc_ca.append_value(sorted_asc);
1559 sorted_dsc_ca.append_value(sorted_dsc);
1560 fast_explode_list_ca.append_value(fast_explode_list);
1561 materialized_at_ca.append_option(materialized_at.map(|v| format!("{v:#?}")));
1562 }
1563
1564 unsafe {
1565 DataFrame::new_unchecked(
1566 self.width(),
1567 vec![
1568 column_names.finish().into_column(),
1569 repr_ca.finish().into_column(),
1570 sorted_asc_ca.finish().into_column(),
1571 sorted_dsc_ca.finish().into_column(),
1572 fast_explode_list_ca.finish().into_column(),
1573 materialized_at_ca.finish().into_column(),
1574 ],
1575 )
1576 }
1577 }
1578 pub fn sort(
1616 &self,
1617 by: impl IntoIterator<Item = impl AsRef<str>>,
1618 sort_options: SortMultipleOptions,
1619 ) -> PolarsResult<Self> {
1620 let mut df = self.clone();
1621 df.sort_in_place(by, sort_options)?;
1622 Ok(df)
1623 }
1624
1625 pub fn replace(&mut self, column: &str, new_col: Column) -> PolarsResult<&mut Self> {
1640 self.apply(column, |_| new_col)
1641 }
1642
1643 pub fn replace_column(&mut self, index: usize, new_column: Column) -> PolarsResult<&mut Self> {
1658 polars_ensure!(
1659 index < self.width(),
1660 ShapeMismatch:
1661 "unable to replace at index {}, the DataFrame has only {} columns",
1662 index, self.width(),
1663 );
1664
1665 polars_ensure!(
1666 new_column.len() == self.height(),
1667 ShapeMismatch:
1668 "unable to replace a column, series length {} doesn't match the DataFrame height {}",
1669 new_column.len(), self.height(),
1670 );
1671
1672 unsafe { *self.columns_mut().get_mut(index).unwrap() = new_column };
1673
1674 Ok(self)
1675 }
1676
1677 pub fn apply<F, C>(&mut self, name: &str, f: F) -> PolarsResult<&mut Self>
1718 where
1719 F: FnOnce(&Column) -> C,
1720 C: IntoColumn,
1721 {
1722 let idx = self.try_get_column_index(name)?;
1723 self.apply_at_idx(idx, f)?;
1724 Ok(self)
1725 }
1726
1727 pub fn apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1758 where
1759 F: FnOnce(&Column) -> C,
1760 C: IntoColumn,
1761 {
1762 let df_height = self.height();
1763 let width = self.width();
1764
1765 let cached_schema = self.cached_schema().cloned();
1766
1767 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1768 polars_err!(
1769 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1770 idx, width
1771 )
1772 })?;
1773
1774 let mut new_col = f(col).into_column();
1775
1776 if new_col.len() != df_height && new_col.len() == 1 {
1777 new_col = new_col.new_from_index(0, df_height);
1778 }
1779
1780 polars_ensure!(
1781 new_col.len() == df_height,
1782 ShapeMismatch:
1783 "apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1784 new_col.len(), df_height
1785 );
1786
1787 new_col = new_col.with_name(col.name().clone());
1788 let col_before = std::mem::replace(col, new_col);
1789
1790 if col.dtype() == col_before.dtype() {
1791 unsafe { self.set_opt_schema(cached_schema) };
1792 }
1793
1794 Ok(self)
1795 }
1796
1797 pub fn try_apply_at_idx<F, C>(&mut self, idx: usize, f: F) -> PolarsResult<&mut Self>
1838 where
1839 F: FnOnce(&Column) -> PolarsResult<C>,
1840 C: IntoColumn,
1841 {
1842 let df_height = self.height();
1843 let width = self.width();
1844
1845 let cached_schema = self.cached_schema().cloned();
1846
1847 let col = unsafe { self.columns_mut() }.get_mut(idx).ok_or_else(|| {
1848 polars_err!(
1849 ComputeError: "invalid column index: {} for a DataFrame with {} columns",
1850 idx, width
1851 )
1852 })?;
1853
1854 let mut new_col = f(col).map(|c| c.into_column())?;
1855
1856 polars_ensure!(
1857 new_col.len() == df_height,
1858 ShapeMismatch:
1859 "try_apply_at_idx: resulting Series has length {} while the DataFrame has height {}",
1860 new_col.len(), df_height
1861 );
1862
1863 new_col = new_col.with_name(col.name().clone());
1865 let col_before = std::mem::replace(col, new_col);
1866
1867 if col.dtype() == col_before.dtype() {
1868 unsafe { self.set_opt_schema(cached_schema) };
1869 }
1870
1871 Ok(self)
1872 }
1873
1874 pub fn try_apply<F, C>(&mut self, column: &str, f: F) -> PolarsResult<&mut Self>
1917 where
1918 F: FnOnce(&Series) -> PolarsResult<C>,
1919 C: IntoColumn,
1920 {
1921 let idx = self.try_get_column_index(column)?;
1922 self.try_apply_at_idx(idx, |c| f(c.as_materialized_series()))
1923 }
1924
1925 #[must_use]
1955 pub fn slice(&self, offset: i64, length: usize) -> Self {
1956 if offset == 0 && length == self.height() {
1957 return self.clone();
1958 }
1959
1960 if length == 0 {
1961 return self.clear();
1962 }
1963
1964 let cols = self.apply_columns(|s| s.slice(offset, length));
1965
1966 let height = if let Some(fst) = cols.first() {
1967 fst.len()
1968 } else {
1969 let (_, length) = slice_offsets(offset, length, self.height());
1970 length
1971 };
1972
1973 unsafe { DataFrame::_new_unchecked_impl(height, cols).with_schema_from(self) }
1974 }
1975
1976 pub fn split_at(&self, offset: i64) -> (Self, Self) {
1978 let (a, b) = self.columns().iter().map(|s| s.split_at(offset)).unzip();
1979
1980 let (idx, _) = slice_offsets(offset, 0, self.height());
1981
1982 let a = unsafe { DataFrame::new_unchecked(idx, a).with_schema_from(self) };
1983 let b = unsafe { DataFrame::new_unchecked(self.height() - idx, b).with_schema_from(self) };
1984 (a, b)
1985 }
1986
1987 #[must_use]
1988 pub fn clear(&self) -> Self {
1989 let cols = self.columns().iter().map(|s| s.clear()).collect::<Vec<_>>();
1990 unsafe { DataFrame::_new_unchecked_impl(0, cols).with_schema_from(self) }
1991 }
1992
1993 #[must_use]
1994 pub fn slice_par(&self, offset: i64, length: usize) -> Self {
1995 if offset == 0 && length == self.height() {
1996 return self.clone();
1997 }
1998 let columns = self.apply_columns_par(|s| s.slice(offset, length));
1999 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2000 }
2001
2002 #[must_use]
2003 pub fn _slice_and_realloc(&self, offset: i64, length: usize) -> Self {
2004 if offset == 0 && length == self.height() {
2005 return self.clone();
2006 }
2007 let columns = self.apply_columns(|s| {
2009 let mut out = s.slice(offset, length);
2010 out.shrink_to_fit();
2011 out
2012 });
2013 unsafe { DataFrame::new_unchecked(length, columns).with_schema_from(self) }
2014 }
2015
2016 #[must_use]
2050 pub fn head(&self, length: Option<usize>) -> Self {
2051 let new_height = usize::min(self.height(), length.unwrap_or(HEAD_DEFAULT_LENGTH));
2052 let new_cols = self.apply_columns(|c| c.head(Some(new_height)));
2053
2054 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2055 }
2056
2057 #[must_use]
2088 pub fn tail(&self, length: Option<usize>) -> Self {
2089 let new_height = usize::min(self.height(), length.unwrap_or(TAIL_DEFAULT_LENGTH));
2090 let new_cols = self.apply_columns(|c| c.tail(Some(new_height)));
2091
2092 unsafe { DataFrame::new_unchecked(new_height, new_cols).with_schema_from(self) }
2093 }
2094
2095 pub fn iter_chunks(
2105 &self,
2106 compat_level: CompatLevel,
2107 parallel: bool,
2108 ) -> impl Iterator<Item = RecordBatch> + '_ {
2109 debug_assert!(!self.should_rechunk(), "expected equal chunks");
2110
2111 if self.width() == 0 {
2112 return RecordBatchIterWrap::new_zero_width(self.height());
2113 }
2114
2115 let must_convert = compat_level.0 == 0;
2118 let parallel = parallel
2119 && must_convert
2120 && self.width() > 1
2121 && self
2122 .columns()
2123 .iter()
2124 .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));
2125
2126 RecordBatchIterWrap::Batches(RecordBatchIter {
2127 df: self,
2128 schema: Arc::new(
2129 self.columns()
2130 .iter()
2131 .map(|c| c.field().to_arrow(compat_level))
2132 .collect(),
2133 ),
2134 idx: 0,
2135 n_chunks: usize::max(1, self.first_col_n_chunks()),
2136 compat_level,
2137 parallel,
2138 })
2139 }
2140
2141 pub fn iter_chunks_physical(&self) -> impl Iterator<Item = RecordBatch> + '_ {
2151 debug_assert!(!self.should_rechunk());
2152
2153 if self.width() == 0 {
2154 return RecordBatchIterWrap::new_zero_width(self.height());
2155 }
2156
2157 RecordBatchIterWrap::PhysicalBatches(PhysRecordBatchIter {
2158 schema: Arc::new(
2159 self.columns()
2160 .iter()
2161 .map(|c| c.field().to_arrow(CompatLevel::newest()))
2162 .collect(),
2163 ),
2164 arr_iters: self
2165 .materialized_column_iter()
2166 .map(|s| s.chunks().iter())
2167 .collect(),
2168 })
2169 }
2170
2171 #[must_use]
2173 pub fn reverse(&self) -> Self {
2174 let new_cols = self.apply_columns(Column::reverse);
2175 unsafe { DataFrame::new_unchecked(self.height(), new_cols).with_schema_from(self) }
2176 }
2177
2178 #[must_use]
2183 pub fn shift(&self, periods: i64) -> Self {
2184 let col = self.apply_columns_par(|s| s.shift(periods));
2185 unsafe { DataFrame::new_unchecked(self.height(), col).with_schema_from(self) }
2186 }
2187
2188 pub fn fill_null(&self, strategy: FillNullStrategy) -> PolarsResult<Self> {
2197 let col = self.try_apply_columns_par(|s| s.fill_null(strategy))?;
2198
2199 Ok(unsafe { DataFrame::new_unchecked(self.height(), col) })
2200 }
2201
2202 pub fn pipe<F, B>(self, f: F) -> PolarsResult<B>
2204 where
2205 F: Fn(DataFrame) -> PolarsResult<B>,
2206 {
2207 f(self)
2208 }
2209
2210 pub fn pipe_mut<F, B>(&mut self, f: F) -> PolarsResult<B>
2212 where
2213 F: Fn(&mut DataFrame) -> PolarsResult<B>,
2214 {
2215 f(self)
2216 }
2217
2218 pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> PolarsResult<B>
2220 where
2221 F: Fn(DataFrame, Args) -> PolarsResult<B>,
2222 {
2223 f(self, args)
2224 }
2225 #[cfg(feature = "algorithm_group_by")]
2259 pub fn unique_stable(
2260 &self,
2261 subset: Option<&[String]>,
2262 keep: UniqueKeepStrategy,
2263 slice: Option<(i64, usize)>,
2264 ) -> PolarsResult<DataFrame> {
2265 self.unique_impl(
2266 true,
2267 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2268 keep,
2269 slice,
2270 )
2271 }
2272
2273 #[cfg(feature = "algorithm_group_by")]
2275 pub fn unique<I, S>(
2276 &self,
2277 subset: Option<&[String]>,
2278 keep: UniqueKeepStrategy,
2279 slice: Option<(i64, usize)>,
2280 ) -> PolarsResult<DataFrame> {
2281 self.unique_impl(
2282 false,
2283 subset.map(|v| v.iter().map(|x| PlSmallStr::from_str(x.as_str())).collect()),
2284 keep,
2285 slice,
2286 )
2287 }
2288
2289 #[cfg(feature = "algorithm_group_by")]
2290 pub fn unique_impl(
2291 &self,
2292 maintain_order: bool,
2293 subset: Option<Vec<PlSmallStr>>,
2294 keep: UniqueKeepStrategy,
2295 slice: Option<(i64, usize)>,
2296 ) -> PolarsResult<Self> {
2297 if self.width() == 0 {
2298 let height = usize::min(self.height(), 1);
2299 return Ok(DataFrame::empty_with_height(height));
2300 }
2301
2302 let names = subset.unwrap_or_else(|| self.get_column_names_owned());
2303 let mut df = self.clone();
2304 df.rechunk_mut_par();
2306
2307 let columns = match (keep, maintain_order) {
2308 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, true) => {
2309 let gb = df.group_by_stable(names)?;
2310 let groups = gb.get_groups();
2311 let (offset, len) = slice.unwrap_or((0, groups.len()));
2312 let groups = groups.slice(offset, len);
2313 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2314 },
2315 (UniqueKeepStrategy::Last, true) => {
2316 let gb = df.group_by_stable(names)?;
2319 let groups = gb.get_groups();
2320
2321 let last_idx: NoNull<IdxCa> = groups
2322 .iter()
2323 .map(|g| match g {
2324 GroupsIndicator::Idx((_first, idx)) => idx[idx.len() - 1],
2325 GroupsIndicator::Slice([first, len]) => first + len - 1,
2326 })
2327 .collect();
2328
2329 let mut last_idx = last_idx.into_inner().sort(false);
2330
2331 if let Some((offset, len)) = slice {
2332 last_idx = last_idx.slice(offset, len);
2333 }
2334
2335 let last_idx = NoNull::new(last_idx);
2336 let out = unsafe { df.take_unchecked(&last_idx) };
2337 return Ok(out);
2338 },
2339 (UniqueKeepStrategy::First | UniqueKeepStrategy::Any, false) => {
2340 let gb = df.group_by(names)?;
2341 let groups = gb.get_groups();
2342 let (offset, len) = slice.unwrap_or((0, groups.len()));
2343 let groups = groups.slice(offset, len);
2344 df.apply_columns_par(|s| unsafe { s.agg_first(&groups) })
2345 },
2346 (UniqueKeepStrategy::Last, false) => {
2347 let gb = df.group_by(names)?;
2348 let groups = gb.get_groups();
2349 let (offset, len) = slice.unwrap_or((0, groups.len()));
2350 let groups = groups.slice(offset, len);
2351 df.apply_columns_par(|s| unsafe { s.agg_last(&groups) })
2352 },
2353 (UniqueKeepStrategy::None, _) => {
2354 let df_part = df.select(names)?;
2355 let mask = df_part.is_unique()?;
2356 let mut filtered = df.filter(&mask)?;
2357
2358 if let Some((offset, len)) = slice {
2359 filtered = filtered.slice(offset, len);
2360 }
2361 return Ok(filtered);
2362 },
2363 };
2364 Ok(unsafe { DataFrame::new_unchecked_infer_height(columns).with_schema_from(self) })
2365 }
2366
2367 #[cfg(feature = "algorithm_group_by")]
2381 pub fn is_unique(&self) -> PolarsResult<BooleanChunked> {
2382 let gb = self.group_by(self.get_column_names_owned())?;
2383 let groups = gb.get_groups();
2384 Ok(is_unique_helper(
2385 groups,
2386 self.height() as IdxSize,
2387 true,
2388 false,
2389 ))
2390 }
2391
2392 #[cfg(feature = "algorithm_group_by")]
2406 pub fn is_duplicated(&self) -> PolarsResult<BooleanChunked> {
2407 let gb = self.group_by(self.get_column_names_owned())?;
2408 let groups = gb.get_groups();
2409 Ok(is_unique_helper(
2410 groups,
2411 self.height() as IdxSize,
2412 false,
2413 true,
2414 ))
2415 }
2416
2417 #[must_use]
2419 pub fn null_count(&self) -> Self {
2420 let cols =
2421 self.apply_columns(|c| Column::new(c.name().clone(), [c.null_count() as IdxSize]));
2422 unsafe { Self::new_unchecked(1, cols) }
2423 }
2424
2425 #[cfg(feature = "row_hash")]
2427 pub fn hash_rows(
2428 &mut self,
2429 hasher_builder: Option<PlSeedableRandomStateQuality>,
2430 ) -> PolarsResult<UInt64Chunked> {
2431 let dfs = split_df(self, POOL.current_num_threads(), false);
2432 let (cas, _) = _df_rows_to_hashes_threaded_vertical(&dfs, hasher_builder)?;
2433
2434 let mut iter = cas.into_iter();
2435 let mut acc_ca = iter.next().unwrap();
2436 for ca in iter {
2437 acc_ca.append(&ca)?;
2438 }
2439 Ok(acc_ca.rechunk().into_owned())
2440 }
2441
2442 pub fn get_supertype(&self) -> Option<PolarsResult<DataType>> {
2444 self.columns()
2445 .iter()
2446 .map(|s| Ok(s.dtype().clone()))
2447 .reduce(|acc, b| try_get_supertype(&acc?, &b.unwrap()))
2448 }
2449
2450 #[doc(hidden)]
2455 pub unsafe fn _take_unchecked_slice(&self, idx: &[IdxSize], allow_threads: bool) -> Self {
2456 self._take_unchecked_slice_sorted(idx, allow_threads, IsSorted::Not)
2457 }
2458
2459 #[doc(hidden)]
2466 pub unsafe fn _take_unchecked_slice_sorted(
2467 &self,
2468 idx: &[IdxSize],
2469 allow_threads: bool,
2470 sorted: IsSorted,
2471 ) -> Self {
2472 #[cfg(debug_assertions)]
2473 {
2474 if idx.len() > 2 {
2475 use crate::series::IsSorted;
2476
2477 match sorted {
2478 IsSorted::Ascending => {
2479 assert!(idx[0] <= idx[idx.len() - 1]);
2480 },
2481 IsSorted::Descending => {
2482 assert!(idx[0] >= idx[idx.len() - 1]);
2483 },
2484 _ => {},
2485 }
2486 }
2487 }
2488 let mut ca = IdxCa::mmap_slice(PlSmallStr::EMPTY, idx);
2489 ca.set_sorted_flag(sorted);
2490 self.take_unchecked_impl(&ca, allow_threads)
2491 }
2492 #[cfg(all(feature = "partition_by", feature = "algorithm_group_by"))]
2493 #[doc(hidden)]
2494 pub fn _partition_by_impl(
2495 &self,
2496 cols: &[PlSmallStr],
2497 stable: bool,
2498 include_key: bool,
2499 parallel: bool,
2500 ) -> PolarsResult<Vec<DataFrame>> {
2501 let selected_keys = self.select_to_vec(cols.iter().cloned())?;
2502 let groups = self.group_by_with_series(selected_keys, parallel, stable)?;
2503 let groups = groups.into_groups();
2504
2505 let df = if include_key {
2507 self.clone()
2508 } else {
2509 self.drop_many(cols.iter().cloned())
2510 };
2511
2512 if parallel {
2513 POOL.install(|| {
2516 match groups.as_ref() {
2517 GroupsType::Idx(idx) => {
2518 let mut df = df.clone();
2520 df.rechunk_mut_par();
2521 Ok(idx
2522 .into_par_iter()
2523 .map(|(_, group)| {
2524 unsafe {
2526 df._take_unchecked_slice_sorted(
2527 group,
2528 false,
2529 IsSorted::Ascending,
2530 )
2531 }
2532 })
2533 .collect())
2534 },
2535 GroupsType::Slice { groups, .. } => Ok(groups
2536 .into_par_iter()
2537 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2538 .collect()),
2539 }
2540 })
2541 } else {
2542 match groups.as_ref() {
2543 GroupsType::Idx(idx) => {
2544 let mut df = df;
2546 df.rechunk_mut();
2547 Ok(idx
2548 .into_iter()
2549 .map(|(_, group)| {
2550 unsafe {
2552 df._take_unchecked_slice_sorted(group, false, IsSorted::Ascending)
2553 }
2554 })
2555 .collect())
2556 },
2557 GroupsType::Slice { groups, .. } => Ok(groups
2558 .iter()
2559 .map(|[first, len]| df.slice(*first as i64, *len as usize))
2560 .collect()),
2561 }
2562 }
2563 }
2564
2565 #[cfg(feature = "partition_by")]
2567 pub fn partition_by<I, S>(&self, cols: I, include_key: bool) -> PolarsResult<Vec<DataFrame>>
2568 where
2569 I: IntoIterator<Item = S>,
2570 S: Into<PlSmallStr>,
2571 {
2572 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2573 self._partition_by_impl(cols.as_slice(), false, include_key, true)
2574 }
2575
2576 #[cfg(feature = "partition_by")]
2579 pub fn partition_by_stable<I, S>(
2580 &self,
2581 cols: I,
2582 include_key: bool,
2583 ) -> PolarsResult<Vec<DataFrame>>
2584 where
2585 I: IntoIterator<Item = S>,
2586 S: Into<PlSmallStr>,
2587 {
2588 let cols: UnitVec<PlSmallStr> = cols.into_iter().map(Into::into).collect();
2589 self._partition_by_impl(cols.as_slice(), true, include_key, true)
2590 }
2591
2592 #[cfg(feature = "dtype-struct")]
2595 pub fn unnest(
2596 &self,
2597 cols: impl IntoIterator<Item = impl Into<PlSmallStr>>,
2598 separator: Option<&str>,
2599 ) -> PolarsResult<DataFrame> {
2600 self.unnest_impl(cols.into_iter().map(Into::into).collect(), separator)
2601 }
2602
2603 #[cfg(feature = "dtype-struct")]
2604 fn unnest_impl(
2605 &self,
2606 cols: PlHashSet<PlSmallStr>,
2607 separator: Option<&str>,
2608 ) -> PolarsResult<DataFrame> {
2609 let mut new_cols = Vec::with_capacity(std::cmp::min(self.width() * 2, self.width() + 128));
2610 let mut count = 0;
2611 for s in self.columns() {
2612 if cols.contains(s.name()) {
2613 let ca = s.struct_()?.clone();
2614 new_cols.extend(ca.fields_as_series().into_iter().map(|mut f| {
2615 if let Some(separator) = &separator {
2616 f.rename(polars_utils::format_pl_smallstr!(
2617 "{}{}{}",
2618 s.name(),
2619 separator,
2620 f.name()
2621 ));
2622 }
2623 Column::from(f)
2624 }));
2625 count += 1;
2626 } else {
2627 new_cols.push(s.clone())
2628 }
2629 }
2630 if count != cols.len() {
2631 let schema = self.schema();
2634 for col in cols {
2635 let _ = schema
2636 .get(col.as_str())
2637 .ok_or_else(|| polars_err!(col_not_found = col))?;
2638 }
2639 }
2640
2641 DataFrame::new_infer_height(new_cols)
2642 }
2643
2644 pub fn append_record_batch(&mut self, rb: RecordBatchT<ArrayRef>) -> PolarsResult<()> {
2645 let df = DataFrame::from(rb);
2648 polars_ensure!(
2649 self.schema() == df.schema(),
2650 SchemaMismatch: "cannot append record batch with different schema\n\n
2651 Got {:?}\nexpected: {:?}", df.schema(), self.schema(),
2652 );
2653 self.vstack_mut_owned_unchecked(df);
2654 Ok(())
2655 }
2656}
2657
2658pub struct RecordBatchIter<'a> {
2659 df: &'a DataFrame,
2660 schema: ArrowSchemaRef,
2661 idx: usize,
2662 n_chunks: usize,
2663 compat_level: CompatLevel,
2664 parallel: bool,
2665}
2666
2667impl Iterator for RecordBatchIter<'_> {
2668 type Item = RecordBatch;
2669
2670 fn next(&mut self) -> Option<Self::Item> {
2671 if self.idx >= self.n_chunks {
2672 return None;
2673 }
2674
2675 let batch_cols: Vec<ArrayRef> = if self.parallel {
2677 let iter = self
2678 .df
2679 .columns()
2680 .par_iter()
2681 .map(Column::as_materialized_series)
2682 .map(|s| s.to_arrow(self.idx, self.compat_level));
2683 POOL.install(|| iter.collect())
2684 } else {
2685 self.df
2686 .columns()
2687 .iter()
2688 .map(Column::as_materialized_series)
2689 .map(|s| s.to_arrow(self.idx, self.compat_level))
2690 .collect()
2691 };
2692
2693 let length = batch_cols.first().map_or(0, |arr| arr.len());
2694
2695 self.idx += 1;
2696
2697 Some(RecordBatch::new(length, self.schema.clone(), batch_cols))
2698 }
2699
2700 fn size_hint(&self) -> (usize, Option<usize>) {
2701 let n = self.n_chunks - self.idx;
2702 (n, Some(n))
2703 }
2704}
2705
2706pub struct PhysRecordBatchIter<'a> {
2707 schema: ArrowSchemaRef,
2708 arr_iters: Vec<std::slice::Iter<'a, ArrayRef>>,
2709}
2710
2711impl Iterator for PhysRecordBatchIter<'_> {
2712 type Item = RecordBatch;
2713
2714 fn next(&mut self) -> Option<Self::Item> {
2715 let arrs = self
2716 .arr_iters
2717 .iter_mut()
2718 .map(|phys_iter| phys_iter.next().cloned())
2719 .collect::<Option<Vec<_>>>()?;
2720
2721 let length = arrs.first().map_or(0, |arr| arr.len());
2722 Some(RecordBatch::new(length, self.schema.clone(), arrs))
2723 }
2724
2725 fn size_hint(&self) -> (usize, Option<usize>) {
2726 if let Some(iter) = self.arr_iters.first() {
2727 iter.size_hint()
2728 } else {
2729 (0, None)
2730 }
2731 }
2732}
2733
2734pub enum RecordBatchIterWrap<'a> {
2735 ZeroWidth {
2736 remaining_height: usize,
2737 chunk_size: usize,
2738 },
2739 Batches(RecordBatchIter<'a>),
2740 PhysicalBatches(PhysRecordBatchIter<'a>),
2741}
2742
2743impl<'a> RecordBatchIterWrap<'a> {
2744 fn new_zero_width(height: usize) -> Self {
2745 Self::ZeroWidth {
2746 remaining_height: height,
2747 chunk_size: get_ideal_morsel_size().get(),
2748 }
2749 }
2750}
2751
2752impl Iterator for RecordBatchIterWrap<'_> {
2753 type Item = RecordBatch;
2754
2755 fn next(&mut self) -> Option<Self::Item> {
2756 match self {
2757 Self::ZeroWidth {
2758 remaining_height,
2759 chunk_size,
2760 } => {
2761 let n = usize::min(*remaining_height, *chunk_size);
2762 *remaining_height -= n;
2763
2764 (n > 0).then(|| RecordBatch::new(n, ArrowSchemaRef::default(), vec![]))
2765 },
2766 Self::Batches(v) => v.next(),
2767 Self::PhysicalBatches(v) => v.next(),
2768 }
2769 }
2770
2771 fn size_hint(&self) -> (usize, Option<usize>) {
2772 match self {
2773 Self::ZeroWidth {
2774 remaining_height,
2775 chunk_size,
2776 } => {
2777 let n = remaining_height.div_ceil(*chunk_size);
2778 (n, Some(n))
2779 },
2780 Self::Batches(v) => v.size_hint(),
2781 Self::PhysicalBatches(v) => v.size_hint(),
2782 }
2783 }
2784}
2785
2786fn ensure_can_extend(left: &Column, right: &Column) -> PolarsResult<()> {
2788 polars_ensure!(
2789 left.name() == right.name(),
2790 ShapeMismatch: "unable to vstack, column names don't match: {:?} and {:?}",
2791 left.name(), right.name(),
2792 );
2793 Ok(())
2794}
2795
2796#[cfg(test)]
2797mod test {
2798 use super::*;
2799
2800 fn create_frame() -> DataFrame {
2801 let s0 = Column::new("days".into(), [0, 1, 2].as_ref());
2802 let s1 = Column::new("temp".into(), [22.1, 19.9, 7.].as_ref());
2803 DataFrame::new_infer_height(vec![s0, s1]).unwrap()
2804 }
2805
2806 #[test]
2807 #[cfg_attr(miri, ignore)]
2808 fn test_recordbatch_iterator() {
2809 let df = df!(
2810 "foo" => [1, 2, 3, 4, 5]
2811 )
2812 .unwrap();
2813 let mut iter = df.iter_chunks(CompatLevel::newest(), false);
2814 assert_eq!(5, iter.next().unwrap().len());
2815 assert!(iter.next().is_none());
2816 }
2817
2818 #[test]
2819 #[cfg_attr(miri, ignore)]
2820 fn test_select() {
2821 let df = create_frame();
2822 assert_eq!(
2823 df.column("days")
2824 .unwrap()
2825 .as_series()
2826 .unwrap()
2827 .equal(1)
2828 .unwrap()
2829 .sum(),
2830 Some(1)
2831 );
2832 }
2833
2834 #[test]
2835 #[cfg_attr(miri, ignore)]
2836 fn test_filter_broadcast_on_string_col() {
2837 let col_name = "some_col";
2838 let v = vec!["test".to_string()];
2839 let s0 = Column::new(PlSmallStr::from_str(col_name), v);
2840 let mut df = DataFrame::new_infer_height(vec![s0]).unwrap();
2841
2842 df = df
2843 .filter(
2844 &df.column(col_name)
2845 .unwrap()
2846 .as_materialized_series()
2847 .equal("")
2848 .unwrap(),
2849 )
2850 .unwrap();
2851 assert_eq!(
2852 df.column(col_name)
2853 .unwrap()
2854 .as_materialized_series()
2855 .n_chunks(),
2856 1
2857 );
2858 }
2859
2860 #[test]
2861 #[cfg_attr(miri, ignore)]
2862 fn test_filter_broadcast_on_list_col() {
2863 let s1 = Series::new(PlSmallStr::EMPTY, [true, false, true]);
2864 let ll: ListChunked = [&s1].iter().copied().collect();
2865
2866 let mask = BooleanChunked::from_slice(PlSmallStr::EMPTY, &[false]);
2867 let new = ll.filter(&mask).unwrap();
2868
2869 assert_eq!(new.chunks.len(), 1);
2870 assert_eq!(new.len(), 0);
2871 }
2872
2873 #[test]
2874 fn slice() {
2875 let df = create_frame();
2876 let sliced_df = df.slice(0, 2);
2877 assert_eq!(sliced_df.shape(), (2, 2));
2878 }
2879
2880 #[test]
2881 fn rechunk_false() {
2882 let df = create_frame();
2883 assert!(!df.should_rechunk())
2884 }
2885
2886 #[test]
2887 fn rechunk_true() -> PolarsResult<()> {
2888 let mut base = df!(
2889 "a" => [1, 2, 3],
2890 "b" => [1, 2, 3]
2891 )?;
2892
2893 let mut s = Series::new("foo".into(), 0..2);
2895 let s2 = Series::new("bar".into(), 0..1);
2896 s.append(&s2)?;
2897
2898 let out = base.with_column(s.into_column())?;
2900
2901 assert!(out.should_rechunk());
2903 Ok(())
2904 }
2905
2906 #[test]
2907 fn test_duplicate_column() {
2908 let mut df = df! {
2909 "foo" => [1, 2, 3]
2910 }
2911 .unwrap();
2912 assert!(
2914 df.with_column(Column::new("foo".into(), &[1, 2, 3]))
2915 .is_ok()
2916 );
2917 assert!(
2918 df.with_column(Column::new("bar".into(), &[1, 2, 3]))
2919 .is_ok()
2920 );
2921 assert!(df.column("bar").is_ok())
2922 }
2923
2924 #[test]
2925 #[cfg_attr(miri, ignore)]
2926 fn distinct() {
2927 let df = df! {
2928 "flt" => [1., 1., 2., 2., 3., 3.],
2929 "int" => [1, 1, 2, 2, 3, 3, ],
2930 "str" => ["a", "a", "b", "b", "c", "c"]
2931 }
2932 .unwrap();
2933 let df = df
2934 .unique_stable(None, UniqueKeepStrategy::First, None)
2935 .unwrap()
2936 .sort(["flt"], SortMultipleOptions::default())
2937 .unwrap();
2938 let valid = df! {
2939 "flt" => [1., 2., 3.],
2940 "int" => [1, 2, 3],
2941 "str" => ["a", "b", "c"]
2942 }
2943 .unwrap();
2944 assert!(df.equals(&valid));
2945 }
2946
2947 #[test]
2948 fn test_vstack() {
2949 let mut df = df! {
2951 "flt" => [1., 1., 2., 2., 3., 3.],
2952 "int" => [1, 1, 2, 2, 3, 3, ],
2953 "str" => ["a", "a", "b", "b", "c", "c"]
2954 }
2955 .unwrap();
2956
2957 df.vstack_mut(&df.slice(0, 3)).unwrap();
2958 assert_eq!(df.first_col_n_chunks(), 2)
2959 }
2960
2961 #[test]
2962 fn test_vstack_on_empty_dataframe() {
2963 let mut df = DataFrame::empty();
2964
2965 let df_data = df! {
2966 "flt" => [1., 1., 2., 2., 3., 3.],
2967 "int" => [1, 1, 2, 2, 3, 3, ],
2968 "str" => ["a", "a", "b", "b", "c", "c"]
2969 }
2970 .unwrap();
2971
2972 df.vstack_mut(&df_data).unwrap();
2973 assert_eq!(df.height(), 6)
2974 }
2975
2976 #[test]
2977 fn test_unique_keep_none_with_slice() {
2978 let df = df! {
2979 "x" => [1, 2, 3, 2, 1]
2980 }
2981 .unwrap();
2982 let out = df
2983 .unique_stable(
2984 Some(&["x".to_string()][..]),
2985 UniqueKeepStrategy::None,
2986 Some((0, 2)),
2987 )
2988 .unwrap();
2989 let expected = df! {
2990 "x" => [3]
2991 }
2992 .unwrap();
2993 assert!(out.equals(&expected));
2994 }
2995
2996 #[test]
2997 #[cfg(feature = "dtype-i8")]
2998 fn test_apply_result_schema() {
2999 let mut df = df! {
3000 "x" => [1, 2, 3, 2, 1]
3001 }
3002 .unwrap();
3003
3004 let schema_before = df.schema().clone();
3005 df.apply("x", |f| f.cast(&DataType::Int8).unwrap()).unwrap();
3006 assert_ne!(&schema_before, df.schema());
3007 }
3008}