1#![allow(unsafe_op_in_unsafe_fn)]
2use std::error::Error;
3
4use arrow::array::{Array, MutablePlString, StaticArray};
5use arrow::compute::utils::combine_validities_and;
6use polars_error::PolarsResult;
7use polars_utils::pl_str::PlSmallStr;
8
9use crate::chunked_array::flags::StatisticsFlags;
10use crate::datatypes::{ArrayCollectIterExt, ArrayFromIter};
11use crate::prelude::{ChunkedArray, CompatLevel, PolarsDataType, Series, StringChunked};
12use crate::utils::{align_chunks_binary, align_chunks_binary_owned, align_chunks_ternary};
13
14#[macro_export]
15macro_rules! binary_output_height {
16 ($a:expr, $b:expr, op = $op:expr) => {
17 match ($a.len(), $b.len()) {
18 (a, 1) | (1, a) => Ok(a),
19 (a, b) if a == b => Ok(a),
20 (a, b) => Err(polars_err!(
21 ShapeMismatch:
22 "{} got differing lengths \
23 ({}: {}, {}: {})",
24 $op,
25 stringify!($a.len()), $a.len(),
26 stringify!($b.len()), $b.len(),
27 )),
28 }
29 }
30}
31
32#[macro_export]
33macro_rules! ternary_output_height {
34 ($a:expr, $b:expr, $c:expr, op = $op:expr) => {
35 match ($a.len(), $b.len(), $c.len()) {
36 (a, 1, 1) | (1, a, 1) | (1, 1, a) => Ok(a),
37 (a, b, 1) | (a, 1, b) | (1, a, b) if a == b => Ok(a),
38 (a, b, c) if a == b && b == c => Ok(a),
39 (a, b, c) => Err(polars_err!(
40 ShapeMismatch:
41 "{} got differing lengths \
42 ({}: {}, {}: {}, {}: {})",
43 $op,
44 stringify!($a.len()), $a.len(),
45 stringify!($b.len()), $b.len(),
46 stringify!($c.len()), $c.len(),
47 )),
48 }
49 }
50}
51
52pub trait UnaryFnMut<A1>: FnMut(A1) -> Self::Ret {
55 type Ret;
56}
57
58impl<A1, R, T: FnMut(A1) -> R> UnaryFnMut<A1> for T {
59 type Ret = R;
60}
61
62pub trait TernaryFnMut<A1, A2, A3>: FnMut(A1, A2, A3) -> Self::Ret {
65 type Ret;
66}
67
68impl<A1, A2, A3, R, T: FnMut(A1, A2, A3) -> R> TernaryFnMut<A1, A2, A3> for T {
69 type Ret = R;
70}
71
72pub trait BinaryFnMut<A1, A2>: FnMut(A1, A2) -> Self::Ret {
75 type Ret;
76}
77
78impl<A1, A2, R, T: FnMut(A1, A2) -> R> BinaryFnMut<A1, A2> for T {
79 type Ret = R;
80}
81
82#[inline]
84pub fn unary_kernel<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
85where
86 T: PolarsDataType,
87 V: PolarsDataType<Array = Arr>,
88 Arr: Array,
89 F: FnMut(&T::Array) -> Arr,
90{
91 let iter = ca.downcast_iter().map(op);
92 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
93}
94
95#[inline]
97pub fn unary_kernel_owned<T, V, F, Arr>(ca: ChunkedArray<T>, op: F) -> ChunkedArray<V>
98where
99 T: PolarsDataType,
100 V: PolarsDataType<Array = Arr>,
101 Arr: Array,
102 F: FnMut(T::Array) -> Arr,
103{
104 let name = ca.name().clone();
105 let iter = ca.downcast_into_iter().map(op);
106 ChunkedArray::from_chunk_iter(name, iter)
107}
108
109#[inline]
110pub fn unary_elementwise<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
111where
112 T: PolarsDataType,
113 V: PolarsDataType,
114 F: UnaryFnMut<Option<T::Physical<'a>>>,
115 V::Array: ArrayFromIter<<F as UnaryFnMut<Option<T::Physical<'a>>>>::Ret>,
116{
117 if ca.has_nulls() {
118 let iter = ca
119 .downcast_iter()
120 .map(|arr| arr.iter().map(&mut op).collect_arr());
121 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
122 } else {
123 let iter = ca
124 .downcast_iter()
125 .map(|arr| arr.values_iter().map(|x| op(Some(x))).collect_arr());
126 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
127 }
128}
129
130#[inline]
131pub fn try_unary_elementwise<'a, T, V, F, K, E>(
132 ca: &'a ChunkedArray<T>,
133 mut op: F,
134) -> Result<ChunkedArray<V>, E>
135where
136 T: PolarsDataType,
137 V: PolarsDataType,
138 F: FnMut(Option<T::Physical<'a>>) -> Result<Option<K>, E>,
139 V::Array: ArrayFromIter<Option<K>>,
140{
141 let iter = ca
142 .downcast_iter()
143 .map(|arr| arr.iter().map(&mut op).try_collect_arr());
144 ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
145}
146
147#[inline]
148pub fn unary_elementwise_values<'a, T, V, F>(ca: &'a ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
149where
150 T: PolarsDataType,
151 V: PolarsDataType,
152 F: UnaryFnMut<T::Physical<'a>>,
153 V::Array: ArrayFromIter<<F as UnaryFnMut<T::Physical<'a>>>::Ret>,
154{
155 if ca.null_count() == ca.len() {
156 let arr = V::Array::full_null(
157 ca.len(),
158 V::get_static_dtype().to_arrow(CompatLevel::newest()),
159 );
160 return ChunkedArray::with_chunk(ca.name().clone(), arr);
161 }
162
163 let iter = ca.downcast_iter().map(|arr| {
164 let validity = arr.validity().cloned();
165 let arr: V::Array = arr.values_iter().map(&mut op).collect_arr();
166 arr.with_validity_typed(validity)
167 });
168 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
169}
170
171#[inline]
172pub fn try_unary_elementwise_values<'a, T, V, F, K, E>(
173 ca: &'a ChunkedArray<T>,
174 mut op: F,
175) -> Result<ChunkedArray<V>, E>
176where
177 T: PolarsDataType,
178 V: PolarsDataType,
179 F: FnMut(T::Physical<'a>) -> Result<K, E>,
180 V::Array: ArrayFromIter<K>,
181{
182 if ca.null_count() == ca.len() {
183 let arr = V::Array::full_null(
184 ca.len(),
185 V::get_static_dtype().to_arrow(CompatLevel::newest()),
186 );
187 return Ok(ChunkedArray::with_chunk(ca.name().clone(), arr));
188 }
189
190 let iter = ca.downcast_iter().map(|arr| {
191 let validity = arr.validity().cloned();
192 let arr: V::Array = arr.values_iter().map(&mut op).try_collect_arr()?;
193 Ok(arr.with_validity_typed(validity))
194 });
195 ChunkedArray::try_from_chunk_iter(ca.name().clone(), iter)
196}
197
198#[inline]
203pub fn unary_mut_values<T, V, F, Arr>(ca: &ChunkedArray<T>, mut op: F) -> ChunkedArray<V>
204where
205 T: PolarsDataType,
206 V: PolarsDataType<Array = Arr>,
207 Arr: Array + StaticArray,
208 F: FnMut(&T::Array) -> Arr,
209{
210 let iter = ca
211 .downcast_iter()
212 .map(|arr| op(arr).with_validity_typed(arr.validity().cloned()));
213 ChunkedArray::from_chunk_iter(ca.name().clone(), iter)
214}
215
216#[inline]
218pub fn unary_mut_with_options<T, V, F, Arr>(ca: &ChunkedArray<T>, op: F) -> ChunkedArray<V>
219where
220 T: PolarsDataType,
221 V: PolarsDataType<Array = Arr>,
222 Arr: Array + StaticArray,
223 F: FnMut(&T::Array) -> Arr,
224{
225 ChunkedArray::from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
226}
227
228#[inline]
229pub fn try_unary_mut_with_options<T, V, F, Arr, E>(
230 ca: &ChunkedArray<T>,
231 op: F,
232) -> Result<ChunkedArray<V>, E>
233where
234 T: PolarsDataType,
235 V: PolarsDataType<Array = Arr>,
236 Arr: Array + StaticArray,
237 F: FnMut(&T::Array) -> Result<Arr, E>,
238 E: Error,
239{
240 ChunkedArray::try_from_chunk_iter(ca.name().clone(), ca.downcast_iter().map(op))
241}
242
243#[inline]
244pub fn binary_elementwise<T, U, V, F>(
245 lhs: &ChunkedArray<T>,
246 rhs: &ChunkedArray<U>,
247 mut op: F,
248) -> ChunkedArray<V>
249where
250 T: PolarsDataType,
251 U: PolarsDataType,
252 V: PolarsDataType,
253 F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
254 V::Array: for<'a> ArrayFromIter<
255 <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
256 >,
257{
258 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
259 let iter = lhs
260 .downcast_iter()
261 .zip(rhs.downcast_iter())
262 .map(|(lhs_arr, rhs_arr)| {
263 let element_iter = lhs_arr
264 .iter()
265 .zip(rhs_arr.iter())
266 .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
267 element_iter.collect_arr()
268 });
269 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
270}
271
272#[inline]
273pub fn binary_elementwise_for_each<'a, 'b, T, U, F>(
274 lhs: &'a ChunkedArray<T>,
275 rhs: &'b ChunkedArray<U>,
276 mut op: F,
277) where
278 T: PolarsDataType,
279 U: PolarsDataType,
280 F: FnMut(Option<T::Physical<'a>>, Option<U::Physical<'b>>),
281{
282 let mut lhs_arr_iter = lhs.downcast_iter();
283 let mut rhs_arr_iter = rhs.downcast_iter();
284
285 let lhs_arr = lhs_arr_iter.next().unwrap();
286 let rhs_arr = rhs_arr_iter.next().unwrap();
287
288 let mut lhs_remaining = lhs_arr.len();
289 let mut rhs_remaining = rhs_arr.len();
290 let mut lhs_iter = lhs_arr.iter();
291 let mut rhs_iter = rhs_arr.iter();
292
293 loop {
294 let range = std::cmp::min(lhs_remaining, rhs_remaining);
295
296 for _ in 0..range {
297 let lhs_opt_val = unsafe { lhs_iter.next().unwrap_unchecked() };
299 let rhs_opt_val = unsafe { rhs_iter.next().unwrap_unchecked() };
300 op(lhs_opt_val, rhs_opt_val)
301 }
302 lhs_remaining -= range;
303 rhs_remaining -= range;
304
305 if lhs_remaining == 0 {
306 let Some(new_arr) = lhs_arr_iter.next() else {
307 return;
308 };
309 lhs_remaining = new_arr.len();
310 lhs_iter = new_arr.iter();
311 }
312 if rhs_remaining == 0 {
313 let Some(new_arr) = rhs_arr_iter.next() else {
314 return;
315 };
316 rhs_remaining = new_arr.len();
317 rhs_iter = new_arr.iter();
318 }
319 }
320}
321
322#[inline]
323pub fn try_binary_elementwise<T, U, V, F, K, E>(
324 lhs: &ChunkedArray<T>,
325 rhs: &ChunkedArray<U>,
326 mut op: F,
327) -> Result<ChunkedArray<V>, E>
328where
329 T: PolarsDataType,
330 U: PolarsDataType,
331 V: PolarsDataType,
332 F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
333 V::Array: ArrayFromIter<Option<K>>,
334{
335 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
336 let iter = lhs
337 .downcast_iter()
338 .zip(rhs.downcast_iter())
339 .map(|(lhs_arr, rhs_arr)| {
340 let element_iter = lhs_arr
341 .iter()
342 .zip(rhs_arr.iter())
343 .map(|(lhs_opt_val, rhs_opt_val)| op(lhs_opt_val, rhs_opt_val));
344 element_iter.try_collect_arr()
345 });
346 ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
347}
348
349#[inline]
350pub fn binary_elementwise_values<T, U, V, F, K>(
351 lhs: &ChunkedArray<T>,
352 rhs: &ChunkedArray<U>,
353 mut op: F,
354) -> ChunkedArray<V>
355where
356 T: PolarsDataType,
357 U: PolarsDataType,
358 V: PolarsDataType,
359 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
360 V::Array: ArrayFromIter<K>,
361{
362 if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
363 let len = lhs.len().min(rhs.len());
364 let arr = V::Array::full_null(len, V::get_static_dtype().to_arrow(CompatLevel::newest()));
365
366 return ChunkedArray::with_chunk(lhs.name().clone(), arr);
367 }
368
369 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
370
371 let iter = lhs
372 .downcast_iter()
373 .zip(rhs.downcast_iter())
374 .map(|(lhs_arr, rhs_arr)| {
375 let validity = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
376
377 let element_iter = lhs_arr
378 .values_iter()
379 .zip(rhs_arr.values_iter())
380 .map(|(lhs_val, rhs_val)| op(lhs_val, rhs_val));
381
382 let array: V::Array = element_iter.collect_arr();
383 array.with_validity_typed(validity)
384 });
385 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
386}
387
388#[inline]
392pub fn binary_elementwise_into_string_amortized<T, U, F>(
393 lhs: &ChunkedArray<T>,
394 rhs: &ChunkedArray<U>,
395 mut op: F,
396) -> StringChunked
397where
398 T: PolarsDataType,
399 U: PolarsDataType,
400 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>, &mut String),
401{
402 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
403 let mut buf = String::new();
404 let iter = lhs
405 .downcast_iter()
406 .zip(rhs.downcast_iter())
407 .map(|(lhs_arr, rhs_arr)| {
408 let mut mutarr = MutablePlString::with_capacity(lhs_arr.len());
409 lhs_arr
410 .iter()
411 .zip(rhs_arr.iter())
412 .for_each(|(lhs_opt, rhs_opt)| match (lhs_opt, rhs_opt) {
413 (None, _) | (_, None) => mutarr.push_null(),
414 (Some(lhs_val), Some(rhs_val)) => {
415 buf.clear();
416 op(lhs_val, rhs_val, &mut buf);
417 mutarr.push_value(&buf)
418 },
419 });
420 mutarr.freeze()
421 });
422 ChunkedArray::from_chunk_iter(lhs.name().clone(), iter)
423}
424
425#[inline]
430pub fn binary_mut_values<T, U, V, F, Arr>(
431 lhs: &ChunkedArray<T>,
432 rhs: &ChunkedArray<U>,
433 mut op: F,
434 name: PlSmallStr,
435) -> ChunkedArray<V>
436where
437 T: PolarsDataType,
438 U: PolarsDataType,
439 V: PolarsDataType<Array = Arr>,
440 Arr: Array + StaticArray,
441 F: FnMut(&T::Array, &U::Array) -> Arr,
442{
443 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
444 let iter = lhs
445 .downcast_iter()
446 .zip(rhs.downcast_iter())
447 .map(|(lhs_arr, rhs_arr)| {
448 let ret = op(lhs_arr, rhs_arr);
449 let inp_val = combine_validities_and(lhs_arr.validity(), rhs_arr.validity());
450 let val = combine_validities_and(inp_val.as_ref(), ret.validity());
451 ret.with_validity_typed(val)
452 });
453 ChunkedArray::from_chunk_iter(name, iter)
454}
455
456#[inline]
458pub fn binary_mut_with_options<T, U, V, F, Arr>(
459 lhs: &ChunkedArray<T>,
460 rhs: &ChunkedArray<U>,
461 mut op: F,
462 name: PlSmallStr,
463) -> ChunkedArray<V>
464where
465 T: PolarsDataType,
466 U: PolarsDataType,
467 V: PolarsDataType<Array = Arr>,
468 Arr: Array,
469 F: FnMut(&T::Array, &U::Array) -> Arr,
470{
471 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
472 let iter = lhs
473 .downcast_iter()
474 .zip(rhs.downcast_iter())
475 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
476 ChunkedArray::from_chunk_iter(name, iter)
477}
478
479#[inline]
480pub fn try_binary_mut_with_options<T, U, V, F, Arr, E>(
481 lhs: &ChunkedArray<T>,
482 rhs: &ChunkedArray<U>,
483 mut op: F,
484 name: PlSmallStr,
485) -> Result<ChunkedArray<V>, E>
486where
487 T: PolarsDataType,
488 U: PolarsDataType,
489 V: PolarsDataType<Array = Arr>,
490 Arr: Array,
491 F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
492 E: Error,
493{
494 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
495 let iter = lhs
496 .downcast_iter()
497 .zip(rhs.downcast_iter())
498 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
499 ChunkedArray::try_from_chunk_iter(name, iter)
500}
501
502pub fn binary<T, U, V, F, Arr>(
504 lhs: &ChunkedArray<T>,
505 rhs: &ChunkedArray<U>,
506 op: F,
507) -> ChunkedArray<V>
508where
509 T: PolarsDataType,
510 U: PolarsDataType,
511 V: PolarsDataType<Array = Arr>,
512 Arr: Array,
513 F: FnMut(&T::Array, &U::Array) -> Arr,
514{
515 binary_mut_with_options(lhs, rhs, op, lhs.name().clone())
516}
517
518pub fn binary_owned<L, R, V, F, Arr>(
520 lhs: ChunkedArray<L>,
521 rhs: ChunkedArray<R>,
522 mut op: F,
523) -> ChunkedArray<V>
524where
525 L: PolarsDataType,
526 R: PolarsDataType,
527 V: PolarsDataType<Array = Arr>,
528 Arr: Array,
529 F: FnMut(L::Array, R::Array) -> Arr,
530{
531 let name = lhs.name().clone();
532 let (lhs, rhs) = align_chunks_binary_owned(lhs, rhs);
533 let iter = lhs
534 .downcast_into_iter()
535 .zip(rhs.downcast_into_iter())
536 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
537 ChunkedArray::from_chunk_iter(name, iter)
538}
539
540pub fn try_binary<T, U, V, F, Arr, E>(
542 lhs: &ChunkedArray<T>,
543 rhs: &ChunkedArray<U>,
544 mut op: F,
545) -> Result<ChunkedArray<V>, E>
546where
547 T: PolarsDataType,
548 U: PolarsDataType,
549 V: PolarsDataType<Array = Arr>,
550 Arr: Array,
551 F: FnMut(&T::Array, &U::Array) -> Result<Arr, E>,
552 E: Error,
553{
554 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
555 let iter = lhs
556 .downcast_iter()
557 .zip(rhs.downcast_iter())
558 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr));
559 ChunkedArray::try_from_chunk_iter(lhs.name().clone(), iter)
560}
561
562#[inline]
567pub unsafe fn binary_unchecked_same_type<T, U, F>(
568 lhs: &ChunkedArray<T>,
569 rhs: &ChunkedArray<U>,
570 mut op: F,
571 keep_sorted: bool,
572 keep_fast_explode: bool,
573) -> ChunkedArray<T>
574where
575 T: PolarsDataType,
576 U: PolarsDataType,
577 F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
578{
579 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
580 let chunks = lhs
581 .downcast_iter()
582 .zip(rhs.downcast_iter())
583 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
584 .collect();
585
586 let mut ca = lhs.copy_with_chunks(chunks);
587
588 let mut retain_flags = StatisticsFlags::empty();
589 use StatisticsFlags as F;
590 retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
591 retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
592 ca.retain_flags_from(lhs.as_ref(), retain_flags);
593
594 ca
595}
596
597pub fn try_unary_to_series<T, F>(ca: &ChunkedArray<T>, op: F) -> PolarsResult<Series>
598where
599 T: PolarsDataType,
600 F: FnMut(&T::Array) -> PolarsResult<Box<dyn Array>>,
601{
602 let chunks = ca
603 .downcast_iter()
604 .map(op)
605 .collect::<PolarsResult<Vec<_>>>()?;
606 Series::try_from((ca.name().clone(), chunks))
607}
608
609pub fn binary_to_series<T, U, F>(
610 lhs: &ChunkedArray<T>,
611 rhs: &ChunkedArray<U>,
612 mut op: F,
613) -> PolarsResult<Series>
614where
615 T: PolarsDataType,
616 U: PolarsDataType,
617 F: FnMut(&T::Array, &U::Array) -> Box<dyn Array>,
618{
619 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
620 let chunks = lhs
621 .downcast_iter()
622 .zip(rhs.downcast_iter())
623 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
624 .collect::<Vec<_>>();
625 Series::try_from((lhs.name().clone(), chunks))
626}
627
628pub fn try_binary_to_series<T, U, F>(
629 lhs: &ChunkedArray<T>,
630 rhs: &ChunkedArray<U>,
631 mut op: F,
632) -> PolarsResult<Series>
633where
634 T: PolarsDataType,
635 U: PolarsDataType,
636 F: FnMut(&T::Array, &U::Array) -> PolarsResult<Box<dyn Array>>,
637{
638 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
639 let chunks = lhs
640 .downcast_iter()
641 .zip(rhs.downcast_iter())
642 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
643 .collect::<PolarsResult<Vec<_>>>()?;
644 Series::try_from((lhs.name().clone(), chunks))
645}
646
647#[inline]
652pub unsafe fn try_binary_unchecked_same_type<T, U, F, E>(
653 lhs: &ChunkedArray<T>,
654 rhs: &ChunkedArray<U>,
655 mut op: F,
656 keep_sorted: bool,
657 keep_fast_explode: bool,
658) -> Result<ChunkedArray<T>, E>
659where
660 T: PolarsDataType,
661 U: PolarsDataType,
662 F: FnMut(&T::Array, &U::Array) -> Result<Box<dyn Array>, E>,
663 E: Error,
664{
665 let (lhs, rhs) = align_chunks_binary(lhs, rhs);
666 let chunks = lhs
667 .downcast_iter()
668 .zip(rhs.downcast_iter())
669 .map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
670 .collect::<Result<Vec<_>, E>>()?;
671 let mut ca = lhs.copy_with_chunks(chunks);
672
673 let mut retain_flags = StatisticsFlags::empty();
674 use StatisticsFlags as F;
675 retain_flags.set(F::IS_SORTED_ANY, keep_sorted);
676 retain_flags.set(F::CAN_FAST_EXPLODE_LIST, keep_fast_explode);
677 ca.retain_flags_from(lhs.as_ref(), retain_flags);
678
679 Ok(ca)
680}
681
682#[inline]
683pub fn try_ternary_elementwise<T, U, V, G, F, K, E>(
684 ca1: &ChunkedArray<T>,
685 ca2: &ChunkedArray<U>,
686 ca3: &ChunkedArray<G>,
687 mut op: F,
688) -> Result<ChunkedArray<V>, E>
689where
690 T: PolarsDataType,
691 U: PolarsDataType,
692 V: PolarsDataType,
693 G: PolarsDataType,
694 F: for<'a> FnMut(
695 Option<T::Physical<'a>>,
696 Option<U::Physical<'a>>,
697 Option<G::Physical<'a>>,
698 ) -> Result<Option<K>, E>,
699 V::Array: ArrayFromIter<Option<K>>,
700{
701 let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
702 let iter = ca1
703 .downcast_iter()
704 .zip(ca2.downcast_iter())
705 .zip(ca3.downcast_iter())
706 .map(|((ca1_arr, ca2_arr), ca3_arr)| {
707 let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
708 |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
709 op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
710 },
711 );
712 element_iter.try_collect_arr()
713 });
714 ChunkedArray::try_from_chunk_iter(ca1.name().clone(), iter)
715}
716
717#[inline]
718pub fn ternary_elementwise<T, U, V, G, F>(
719 ca1: &ChunkedArray<T>,
720 ca2: &ChunkedArray<U>,
721 ca3: &ChunkedArray<G>,
722 mut op: F,
723) -> ChunkedArray<V>
724where
725 T: PolarsDataType,
726 U: PolarsDataType,
727 G: PolarsDataType,
728 V: PolarsDataType,
729 F: for<'a> TernaryFnMut<
730 Option<T::Physical<'a>>,
731 Option<U::Physical<'a>>,
732 Option<G::Physical<'a>>,
733 >,
734 V::Array: for<'a> ArrayFromIter<
735 <F as TernaryFnMut<
736 Option<T::Physical<'a>>,
737 Option<U::Physical<'a>>,
738 Option<G::Physical<'a>>,
739 >>::Ret,
740 >,
741{
742 let (ca1, ca2, ca3) = align_chunks_ternary(ca1, ca2, ca3);
743 let iter = ca1
744 .downcast_iter()
745 .zip(ca2.downcast_iter())
746 .zip(ca3.downcast_iter())
747 .map(|((ca1_arr, ca2_arr), ca3_arr)| {
748 let element_iter = ca1_arr.iter().zip(ca2_arr.iter()).zip(ca3_arr.iter()).map(
749 |((ca1_opt_val, ca2_opt_val), ca3_opt_val)| {
750 op(ca1_opt_val, ca2_opt_val, ca3_opt_val)
751 },
752 );
753 element_iter.collect_arr()
754 });
755 ChunkedArray::from_chunk_iter(ca1.name().clone(), iter)
756}
757
758pub fn broadcast_binary_elementwise<T, U, V, F>(
759 lhs: &ChunkedArray<T>,
760 rhs: &ChunkedArray<U>,
761 mut op: F,
762) -> ChunkedArray<V>
763where
764 T: PolarsDataType,
765 U: PolarsDataType,
766 V: PolarsDataType,
767 F: for<'a> BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>,
768 V::Array: for<'a> ArrayFromIter<
769 <F as BinaryFnMut<Option<T::Physical<'a>>, Option<U::Physical<'a>>>>::Ret,
770 >,
771{
772 match (lhs.len(), rhs.len()) {
773 (1, _) => {
774 let a = unsafe { lhs.get_unchecked(0) };
775 unary_elementwise(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
776 },
777 (_, 1) => {
778 let b = unsafe { rhs.get_unchecked(0) };
779 unary_elementwise(lhs, |a| op(a, b.clone()))
780 },
781 _ => binary_elementwise(lhs, rhs, op),
782 }
783}
784
785pub fn broadcast_try_binary_elementwise<T, U, V, F, K, E>(
786 lhs: &ChunkedArray<T>,
787 rhs: &ChunkedArray<U>,
788 mut op: F,
789) -> Result<ChunkedArray<V>, E>
790where
791 T: PolarsDataType,
792 U: PolarsDataType,
793 V: PolarsDataType,
794 F: for<'a> FnMut(Option<T::Physical<'a>>, Option<U::Physical<'a>>) -> Result<Option<K>, E>,
795 V::Array: ArrayFromIter<Option<K>>,
796{
797 match (lhs.len(), rhs.len()) {
798 (1, _) => {
799 let a = unsafe { lhs.get_unchecked(0) };
800 Ok(try_unary_elementwise(rhs, |b| op(a.clone(), b))?.with_name(lhs.name().clone()))
801 },
802 (_, 1) => {
803 let b = unsafe { rhs.get_unchecked(0) };
804 try_unary_elementwise(lhs, |a| op(a, b.clone()))
805 },
806 _ => try_binary_elementwise(lhs, rhs, op),
807 }
808}
809
810pub fn broadcast_binary_elementwise_values<T, U, V, F, K>(
811 lhs: &ChunkedArray<T>,
812 rhs: &ChunkedArray<U>,
813 mut op: F,
814) -> ChunkedArray<V>
815where
816 T: PolarsDataType,
817 U: PolarsDataType,
818 V: PolarsDataType,
819 F: for<'a> FnMut(T::Physical<'a>, U::Physical<'a>) -> K,
820 V::Array: ArrayFromIter<K>,
821{
822 if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
823 let min = lhs.len().min(rhs.len());
824 let max = lhs.len().max(rhs.len());
825 let len = if min == 1 { max } else { min };
826 let arr = V::Array::full_null(len, V::get_static_dtype().to_arrow(CompatLevel::newest()));
827
828 return ChunkedArray::with_chunk(lhs.name().clone(), arr);
829 }
830
831 match (lhs.len(), rhs.len()) {
832 (1, _) => {
833 let a = unsafe { lhs.value_unchecked(0) };
834 unary_elementwise_values(rhs, |b| op(a.clone(), b)).with_name(lhs.name().clone())
835 },
836 (_, 1) => {
837 let b = unsafe { rhs.value_unchecked(0) };
838 unary_elementwise_values(lhs, |a| op(a, b.clone()))
839 },
840 _ => binary_elementwise_values(lhs, rhs, op),
841 }
842}
843
844pub fn apply_binary_kernel_broadcast<'l, 'r, L, R, O, K, LK, RK>(
845 lhs: &'l ChunkedArray<L>,
846 rhs: &'r ChunkedArray<R>,
847 kernel: K,
848 lhs_broadcast_kernel: LK,
849 rhs_broadcast_kernel: RK,
850) -> ChunkedArray<O>
851where
852 L: PolarsDataType,
853 R: PolarsDataType,
854 O: PolarsDataType,
855 K: Fn(&L::Array, &R::Array) -> O::Array,
856 LK: Fn(L::Physical<'l>, &R::Array) -> O::Array,
857 RK: Fn(&L::Array, R::Physical<'r>) -> O::Array,
858{
859 let name = lhs.name();
860 let out = match (lhs.len(), rhs.len()) {
861 (a, b) if a == b => binary(lhs, rhs, |lhs, rhs| kernel(lhs, rhs)),
862 (_, 1) => {
864 let opt_rhs = rhs.get(0);
865 match opt_rhs {
866 None => {
867 let arr = O::Array::full_null(
868 lhs.len(),
869 O::get_static_dtype().to_arrow(CompatLevel::newest()),
870 );
871 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
872 },
873 Some(rhs) => unary_kernel(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
874 }
875 },
876 (1, _) => {
877 let opt_lhs = lhs.get(0);
878 match opt_lhs {
879 None => {
880 let arr = O::Array::full_null(
881 rhs.len(),
882 O::get_static_dtype().to_arrow(CompatLevel::newest()),
883 );
884 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
885 },
886 Some(lhs) => unary_kernel(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
887 }
888 },
889 _ => panic!("Cannot apply operation on arrays of different lengths"),
890 };
891 out.with_name(name.clone())
892}
893
894pub fn apply_binary_kernel_broadcast_owned<L, R, O, K, LK, RK>(
895 lhs: ChunkedArray<L>,
896 rhs: ChunkedArray<R>,
897 kernel: K,
898 lhs_broadcast_kernel: LK,
899 rhs_broadcast_kernel: RK,
900) -> ChunkedArray<O>
901where
902 L: PolarsDataType,
903 R: PolarsDataType,
904 O: PolarsDataType,
905 K: Fn(L::Array, R::Array) -> O::Array,
906 for<'a> LK: Fn(L::Physical<'a>, R::Array) -> O::Array,
907 for<'a> RK: Fn(L::Array, R::Physical<'a>) -> O::Array,
908{
909 let name = lhs.name().to_owned();
910 let out = match (lhs.len(), rhs.len()) {
911 (a, b) if a == b => binary_owned(lhs, rhs, kernel),
912 (_, 1) => {
914 let opt_rhs = rhs.get(0);
915 match opt_rhs {
916 None => {
917 let arr = O::Array::full_null(
918 lhs.len(),
919 O::get_static_dtype().to_arrow(CompatLevel::newest()),
920 );
921 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
922 },
923 Some(rhs) => unary_kernel_owned(lhs, |arr| rhs_broadcast_kernel(arr, rhs.clone())),
924 }
925 },
926 (1, _) => {
927 let opt_lhs = lhs.get(0);
928 match opt_lhs {
929 None => {
930 let arr = O::Array::full_null(
931 rhs.len(),
932 O::get_static_dtype().to_arrow(CompatLevel::newest()),
933 );
934 ChunkedArray::<O>::with_chunk(lhs.name().clone(), arr)
935 },
936 Some(lhs) => unary_kernel_owned(rhs, |arr| lhs_broadcast_kernel(lhs.clone(), arr)),
937 }
938 },
939 _ => panic!("Cannot apply operation on arrays of different lengths"),
940 };
941 out.with_name(name)
942}