use std::iter::Map;
use std::sync::Arc;
use arrow::array::*;
use arrow::bitmap::Bitmap;
use polars_compute::filter::filter_with_bitmap;
use crate::prelude::*;
pub mod ops;
#[macro_use]
pub mod arithmetic;
pub mod builder;
pub mod cast;
pub mod collect;
pub mod comparison;
pub mod float;
pub mod iterator;
pub mod metadata;
#[cfg(feature = "ndarray")]
pub(crate) mod ndarray;
#[cfg(feature = "dtype-array")]
pub(crate) mod array;
mod binary;
mod bitwise;
#[cfg(feature = "object")]
mod drop;
mod from;
mod from_iterator;
pub mod from_iterator_par;
pub(crate) mod list;
pub(crate) mod logical;
#[cfg(feature = "object")]
pub mod object;
#[cfg(feature = "random")]
mod random;
#[cfg(any(
    feature = "temporal",
    feature = "dtype-datetime",
    feature = "dtype-date"
))]
pub mod temporal;
mod to_vec;
mod trusted_len;
use std::mem;
use std::slice::Iter;
use arrow::legacy::kernels::concatenate::concatenate_owned_unchecked;
use arrow::legacy::prelude::*;
use self::metadata::{Metadata, MetadataFlags, MetadataMerge, MetadataProperties};
use crate::series::IsSorted;
use crate::utils::{first_non_null, last_non_null};
#[cfg(not(feature = "dtype-categorical"))]
pub struct RevMapping {}
pub type ChunkLenIter<'a> = std::iter::Map<std::slice::Iter<'a, ArrayRef>, fn(&ArrayRef) -> usize>;
pub struct ChunkedArray<T: PolarsDataType> {
    pub(crate) field: Arc<Field>,
    pub(crate) chunks: Vec<ArrayRef>,
    pub(crate) md: Option<Arc<Metadata<T>>>,
    length: IdxSize,
    null_count: IdxSize,
}
impl<T: PolarsDataType> ChunkedArray<T> {
    pub fn new_with_compute_len(field: Arc<Field>, chunks: Vec<ArrayRef>) -> Self {
        unsafe {
            let mut chunked_arr = Self::new_with_dims(field, chunks, 0, 0);
            chunked_arr.compute_len();
            chunked_arr
        }
    }
    pub unsafe fn new_with_dims(
        field: Arc<Field>,
        chunks: Vec<ArrayRef>,
        length: IdxSize,
        null_count: IdxSize,
    ) -> Self {
        Self {
            field,
            chunks,
            md: None,
            length,
            null_count,
        }
    }
    #[inline(always)]
    pub fn effective_metadata(&self) -> &Metadata<T> {
        self.md.as_ref().map_or(&Metadata::DEFAULT, AsRef::as_ref)
    }
    pub fn metadata(&self) -> Option<&Metadata<T>> {
        self.md.as_ref().map(AsRef::as_ref)
    }
    pub fn metadata_arc(&self) -> Option<&Arc<Metadata<T>>> {
        self.md.as_ref()
    }
    pub fn metadata_owned_arc(&self) -> Arc<Metadata<T>> {
        self.md
            .as_ref()
            .map_or_else(|| Arc::new(Metadata::DEFAULT), Clone::clone)
    }
    pub fn metadata_mut(&mut self) -> &mut Arc<Metadata<T>> {
        self.md.get_or_insert_with(Default::default)
    }
    pub(crate) fn is_sorted_ascending_flag(&self) -> bool {
        self.effective_metadata().is_sorted_ascending()
    }
    pub(crate) fn is_sorted_descending_flag(&self) -> bool {
        self.effective_metadata().is_sorted_descending()
    }
    pub(crate) fn is_sorted_any(&self) -> bool {
        self.is_sorted_ascending_flag() || self.is_sorted_descending_flag()
    }
    pub fn unset_fast_explode_list(&mut self) {
        self.set_fast_explode_list(false)
    }
    pub fn set_fast_explode_list(&mut self, value: bool) {
        Arc::make_mut(self.metadata_mut()).set_fast_explode_list(value)
    }
    pub fn get_fast_explode_list(&self) -> bool {
        self.get_flags().get_fast_explode_list()
    }
    pub fn get_flags(&self) -> MetadataFlags {
        self.effective_metadata().get_flags()
    }
    pub(crate) fn set_flags(&mut self, flags: MetadataFlags) {
        let md = Arc::make_mut(self.metadata_mut());
        md.set_flags(flags);
    }
    pub fn is_sorted_flag(&self) -> IsSorted {
        self.md.as_ref().map_or(IsSorted::Not, |md| md.is_sorted())
    }
    pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
        Arc::make_mut(self.metadata_mut()).set_sorted_flag(sorted)
    }
    pub fn with_sorted_flag(&self, sorted: IsSorted) -> Self {
        let mut out = self.clone();
        out.set_sorted_flag(sorted);
        out
    }
    pub fn get_min_value(&self) -> Option<&T::OwnedPhysical> {
        self.md.as_ref()?.get_min_value()
    }
    pub fn get_max_value(&self) -> Option<&T::OwnedPhysical> {
        self.md.as_ref()?.get_max_value()
    }
    pub fn get_distinct_count(&self) -> Option<IdxSize> {
        self.md.as_ref()?.get_distinct_count()
    }
    pub fn merge_metadata(&mut self, md: Metadata<T>) {
        let Some(self_md) = self.metadata() else {
            self.md = Some(Arc::new(md));
            return;
        };
        match self_md.merge(md) {
            MetadataMerge::Keep => {},
            MetadataMerge::New(md) => self.md = Some(Arc::new(md)),
            MetadataMerge::Conflict => {
                panic!("Trying to merge metadata, but got conflicting information")
            },
        }
    }
    #[inline(always)]
    pub fn copy_metadata_cast<O: PolarsDataType>(
        &mut self,
        other: &ChunkedArray<O>,
        props: MetadataProperties,
    ) {
        use MetadataProperties as P;
        debug_assert!(
            {
                props
                    - (P::SORTED
                        | P::FAST_EXPLODE_LIST
                        | P::MIN_VALUE
                        | P::MAX_VALUE
                        | P::DISTINCT_COUNT)
            }
            .is_empty(),
            "A MetadataProperty was not added to the copy_metadata_cast check"
        );
        debug_assert!(!props.contains(P::MIN_VALUE));
        debug_assert!(!props.contains(P::MAX_VALUE));
        if props.is_empty() {
            return;
        }
        let Some(other_md) = other.metadata() else {
            return;
        };
        if other.is_empty() {
            return;
        }
        let other_md = other_md.filter_props_cast(props);
        self.merge_metadata(other_md);
    }
    #[inline(always)]
    pub fn copy_metadata(&mut self, other: &Self, props: MetadataProperties) {
        use MetadataProperties as P;
        debug_assert!(
            {
                props
                    - (P::SORTED
                        | P::FAST_EXPLODE_LIST
                        | P::MIN_VALUE
                        | P::MAX_VALUE
                        | P::DISTINCT_COUNT)
            }
            .is_empty(),
            "A MetadataProperty was not added to the copy_metadata check"
        );
        if props.is_empty() {
            return;
        }
        let Some(other_md) = other.metadata() else {
            return;
        };
        if other.is_empty() {
            return;
        }
        let other_md = other_md.filter_props(props);
        self.merge_metadata(other_md);
    }
    pub fn first_non_null(&self) -> Option<usize> {
        if self.null_count() == self.len() {
            None
        }
        else if self.null_count() == 0 {
            Some(0)
        } else if self.is_sorted_any() {
            let out = if unsafe { self.downcast_get_unchecked(0).is_null_unchecked(0) } {
                self.null_count()
            } else {
                0
            };
            debug_assert!(
                unsafe { self.get_unchecked(out) }.is_some(),
                "incorrect sorted flag"
            );
            Some(out)
        } else {
            first_non_null(self.iter_validities())
        }
    }
    pub fn last_non_null(&self) -> Option<usize> {
        if self.null_count() == self.len() {
            None
        }
        else if self.null_count() == 0 {
            Some(self.len() - 1)
        } else if self.is_sorted_any() {
            let out = if unsafe { self.downcast_get_unchecked(0).is_null_unchecked(0) } {
                self.len() - 1
            } else {
                self.len() - self.null_count() - 1
            };
            debug_assert!(
                unsafe { self.get_unchecked(out) }.is_some(),
                "incorrect sorted flag"
            );
            Some(out)
        } else {
            last_non_null(self.iter_validities(), self.len())
        }
    }
    pub fn drop_nulls(&self) -> Self {
        if self.null_count() == 0 {
            self.clone()
        } else {
            let chunks = self
                .downcast_iter()
                .map(|arr| {
                    if arr.null_count() == 0 {
                        arr.to_boxed()
                    } else {
                        filter_with_bitmap(arr, arr.validity().unwrap())
                    }
                })
                .collect();
            unsafe {
                Self::new_with_dims(
                    self.field.clone(),
                    chunks,
                    (self.len() - self.null_count()) as IdxSize,
                    0,
                )
            }
        }
    }
    #[inline]
    #[allow(clippy::type_complexity)]
    pub fn iter_validities(&self) -> Map<Iter<'_, ArrayRef>, fn(&ArrayRef) -> Option<&Bitmap>> {
        fn to_validity(arr: &ArrayRef) -> Option<&Bitmap> {
            arr.validity()
        }
        self.chunks.iter().map(to_validity)
    }
    #[inline]
    pub fn has_validity(&self) -> bool {
        self.iter_validities().any(|valid| valid.is_some())
    }
    pub fn shrink_to_fit(&mut self) {
        self.chunks = vec![concatenate_owned_unchecked(self.chunks.as_slice()).unwrap()];
    }
    pub fn clear(&self) -> Self {
        let mut ca = unsafe {
            self.copy_with_chunks(vec![new_empty_array(
                self.chunks.first().unwrap().data_type().clone(),
            )])
        };
        use MetadataProperties as P;
        ca.copy_metadata(self, P::SORTED | P::FAST_EXPLODE_LIST);
        ca
    }
    pub(crate) unsafe fn unpack_series_matching_physical_type(
        &self,
        series: &Series,
    ) -> &ChunkedArray<T> {
        let series_trait = &**series;
        if self.dtype() == series.dtype() {
            &*(series_trait as *const dyn SeriesTrait as *const ChunkedArray<T>)
        } else {
            use DataType::*;
            match (self.dtype(), series.dtype()) {
                (Int64, Datetime(_, _)) | (Int64, Duration(_)) | (Int32, Date) => {
                    &*(series_trait as *const dyn SeriesTrait as *const ChunkedArray<T>)
                },
                _ => panic!(
                    "cannot unpack series {:?} into matching type {:?}",
                    series,
                    self.dtype()
                ),
            }
        }
    }
    pub fn unpack_series_matching_type(&self, series: &Series) -> PolarsResult<&ChunkedArray<T>> {
        polars_ensure!(
            self.dtype() == series.dtype(),
            SchemaMismatch: "cannot unpack series of type `{}` into `{}`",
            series.dtype(),
            self.dtype(),
        );
        Ok(unsafe { self.unpack_series_matching_physical_type(series) })
    }
    pub fn chunk_lengths(&self) -> ChunkLenIter {
        self.chunks.iter().map(|chunk| chunk.len())
    }
    #[inline]
    pub fn chunks(&self) -> &Vec<ArrayRef> {
        &self.chunks
    }
    #[inline]
    pub unsafe fn chunks_mut(&mut self) -> &mut Vec<ArrayRef> {
        &mut self.chunks
    }
    pub fn is_optimal_aligned(&self) -> bool {
        self.chunks.len() == 1 && self.null_count() == 0
    }
    unsafe fn copy_with_chunks(&self, chunks: Vec<ArrayRef>) -> Self {
        Self::new_with_compute_len(self.field.clone(), chunks)
    }
    pub fn dtype(&self) -> &DataType {
        self.field.data_type()
    }
    pub(crate) unsafe fn set_dtype(&mut self, dtype: DataType) {
        self.field = Arc::new(Field::new(self.name(), dtype))
    }
    pub fn name(&self) -> &str {
        self.field.name()
    }
    pub fn ref_field(&self) -> &Field {
        &self.field
    }
    pub fn rename(&mut self, name: &str) {
        self.field = Arc::new(Field::new(name, self.field.data_type().clone()))
    }
    pub fn with_name(mut self, name: &str) -> Self {
        self.rename(name);
        self
    }
}
impl<T> ChunkedArray<T>
where
    T: PolarsDataType,
{
    #[inline]
    pub fn get(&self, idx: usize) -> Option<T::Physical<'_>> {
        let (chunk_idx, arr_idx) = self.index_to_chunked_index(idx);
        assert!(
            chunk_idx < self.chunks().len(),
            "index: {} out of bounds for len: {}",
            idx,
            self.len()
        );
        unsafe {
            let arr = self.downcast_get_unchecked(chunk_idx);
            assert!(
                arr_idx < arr.len(),
                "index: {} out of bounds for len: {}",
                idx,
                self.len()
            );
            arr.get_unchecked(arr_idx)
        }
    }
    #[inline]
    pub unsafe fn get_unchecked(&self, idx: usize) -> Option<T::Physical<'_>> {
        let (chunk_idx, arr_idx) = self.index_to_chunked_index(idx);
        unsafe {
            self.downcast_get_unchecked(chunk_idx)
                .get_unchecked(arr_idx)
        }
    }
    #[inline]
    pub unsafe fn value_unchecked(&self, idx: usize) -> T::Physical<'_> {
        let (chunk_idx, arr_idx) = self.index_to_chunked_index(idx);
        unsafe {
            self.downcast_get_unchecked(chunk_idx)
                .value_unchecked(arr_idx)
        }
    }
    #[inline]
    pub fn last(&self) -> Option<T::Physical<'_>> {
        unsafe {
            let arr = self.downcast_get_unchecked(self.chunks.len().checked_sub(1)?);
            arr.get_unchecked(arr.len().checked_sub(1)?)
        }
    }
}
impl ListChunked {
    #[inline]
    pub fn get_as_series(&self, idx: usize) -> Option<Series> {
        unsafe {
            Some(Series::from_chunks_and_dtype_unchecked(
                self.name(),
                vec![self.get(idx)?],
                &self.inner_dtype().to_physical(),
            ))
        }
    }
}
#[cfg(feature = "dtype-array")]
impl ArrayChunked {
    #[inline]
    pub fn get_as_series(&self, idx: usize) -> Option<Series> {
        unsafe {
            Some(Series::from_chunks_and_dtype_unchecked(
                self.name(),
                vec![self.get(idx)?],
                &self.inner_dtype().to_physical(),
            ))
        }
    }
}
impl<T> ChunkedArray<T>
where
    T: PolarsDataType,
{
    pub(crate) fn match_chunks<I>(&self, chunk_id: I) -> Self
    where
        I: Iterator<Item = usize>,
    {
        debug_assert!(self.chunks.len() == 1);
        let slice = |ca: &Self| {
            let array = &ca.chunks[0];
            let mut offset = 0;
            let chunks = chunk_id
                .map(|len| {
                    debug_assert!((offset + len) <= array.len());
                    let out = unsafe { array.sliced_unchecked(offset, len) };
                    offset += len;
                    out
                })
                .collect();
            unsafe { Self::from_chunks_and_dtype(self.name(), chunks, self.dtype().clone()) }
        };
        if self.chunks.len() != 1 {
            let out = self.rechunk();
            slice(&out)
        } else {
            slice(self)
        }
    }
}
impl<T: PolarsDataType> AsRefDataType for ChunkedArray<T> {
    fn as_ref_dtype(&self) -> &DataType {
        self.dtype()
    }
}
pub(crate) trait AsSinglePtr: AsRefDataType {
    fn as_single_ptr(&mut self) -> PolarsResult<usize> {
        polars_bail!(opq = as_single_ptr, self.as_ref_dtype());
    }
}
impl<T> AsSinglePtr for ChunkedArray<T>
where
    T: PolarsNumericType,
{
    fn as_single_ptr(&mut self) -> PolarsResult<usize> {
        let mut ca = self.rechunk();
        mem::swap(&mut ca, self);
        let a = self.data_views().next().unwrap();
        let ptr = a.as_ptr();
        Ok(ptr as usize)
    }
}
impl AsSinglePtr for BooleanChunked {}
impl AsSinglePtr for ListChunked {}
#[cfg(feature = "dtype-array")]
impl AsSinglePtr for ArrayChunked {}
impl AsSinglePtr for StringChunked {}
impl AsSinglePtr for BinaryChunked {}
#[cfg(feature = "object")]
impl<T: PolarsObject> AsSinglePtr for ObjectChunked<T> {}
pub enum ChunkedArrayLayout<'a, T: PolarsDataType> {
    SingleNoNull(&'a T::Array),
    Single(&'a T::Array),
    MultiNoNull(&'a ChunkedArray<T>),
    Multi(&'a ChunkedArray<T>),
}
impl<T> ChunkedArray<T>
where
    T: PolarsDataType,
{
    pub fn layout(&self) -> ChunkedArrayLayout<'_, T> {
        if self.chunks.len() == 1 {
            let arr = self.downcast_iter().next().unwrap();
            return if arr.null_count() == 0 {
                ChunkedArrayLayout::SingleNoNull(arr)
            } else {
                ChunkedArrayLayout::Single(arr)
            };
        }
        if self.downcast_iter().all(|a| a.null_count() == 0) {
            ChunkedArrayLayout::MultiNoNull(self)
        } else {
            ChunkedArrayLayout::Multi(self)
        }
    }
}
impl<T> ChunkedArray<T>
where
    T: PolarsNumericType,
{
    pub fn cont_slice(&self) -> PolarsResult<&[T::Native]> {
        polars_ensure!(
            self.chunks.len() == 1 && self.chunks[0].null_count() == 0,
            ComputeError: "chunked array is not contiguous"
        );
        Ok(self.downcast_iter().next().map(|arr| arr.values()).unwrap())
    }
    pub(crate) fn cont_slice_mut(&mut self) -> Option<&mut [T::Native]> {
        if self.chunks.len() == 1 && self.chunks[0].null_count() == 0 {
            let arr = unsafe { self.downcast_iter_mut().next().unwrap() };
            arr.get_mut_values()
        } else {
            None
        }
    }
    pub fn data_views(&self) -> impl DoubleEndedIterator<Item = &[T::Native]> {
        self.downcast_iter().map(|arr| arr.values().as_slice())
    }
    #[allow(clippy::wrong_self_convention)]
    pub fn into_no_null_iter(
        &self,
    ) -> impl '_ + Send + Sync + ExactSizeIterator<Item = T::Native> + DoubleEndedIterator + TrustedLen
    {
        #[allow(clippy::map_clone)]
        unsafe {
            self.data_views()
                .flatten()
                .map(|v| *v)
                .trust_my_length(self.len())
        }
    }
}
impl<T: PolarsDataType> Clone for ChunkedArray<T> {
    fn clone(&self) -> Self {
        ChunkedArray {
            field: self.field.clone(),
            chunks: self.chunks.clone(),
            md: self.md.clone(),
            length: self.length,
            null_count: self.null_count,
        }
    }
}
impl<T: PolarsDataType> AsRef<ChunkedArray<T>> for ChunkedArray<T> {
    fn as_ref(&self) -> &ChunkedArray<T> {
        self
    }
}
impl ValueSize for ListChunked {
    fn get_values_size(&self) -> usize {
        self.chunks
            .iter()
            .fold(0usize, |acc, arr| acc + arr.get_values_size())
    }
}
#[cfg(feature = "dtype-array")]
impl ValueSize for ArrayChunked {
    fn get_values_size(&self) -> usize {
        self.chunks
            .iter()
            .fold(0usize, |acc, arr| acc + arr.get_values_size())
    }
}
impl ValueSize for StringChunked {
    fn get_values_size(&self) -> usize {
        self.chunks
            .iter()
            .fold(0usize, |acc, arr| acc + arr.get_values_size())
    }
}
impl ValueSize for BinaryOffsetChunked {
    fn get_values_size(&self) -> usize {
        self.chunks
            .iter()
            .fold(0usize, |acc, arr| acc + arr.get_values_size())
    }
}
pub(crate) fn to_primitive<T: PolarsNumericType>(
    values: Vec<T::Native>,
    validity: Option<Bitmap>,
) -> PrimitiveArray<T::Native> {
    PrimitiveArray::new(T::get_dtype().to_arrow(true), values.into(), validity)
}
pub(crate) fn to_array<T: PolarsNumericType>(
    values: Vec<T::Native>,
    validity: Option<Bitmap>,
) -> ArrayRef {
    Box::new(to_primitive::<T>(values, validity))
}
impl<T: PolarsDataType> Default for ChunkedArray<T> {
    fn default() -> Self {
        ChunkedArray {
            field: Arc::new(Field::new("default", DataType::Null)),
            chunks: Default::default(),
            md: None,
            length: 0,
            null_count: 0,
        }
    }
}
#[cfg(test)]
pub(crate) mod test {
    use crate::prelude::*;
    pub(crate) fn get_chunked_array() -> Int32Chunked {
        ChunkedArray::new("a", &[1, 2, 3])
    }
    #[test]
    fn test_sort() {
        let a = Int32Chunked::new("a", &[1, 9, 3, 2]);
        let b = a
            .sort(false)
            .into_iter()
            .map(|opt| opt.unwrap())
            .collect::<Vec<_>>();
        assert_eq!(b, [1, 2, 3, 9]);
        let a = StringChunked::new("a", &["b", "a", "c"]);
        let a = a.sort(false);
        let b = a.into_iter().collect::<Vec<_>>();
        assert_eq!(b, [Some("a"), Some("b"), Some("c")]);
        assert!(a.is_sorted_ascending_flag());
    }
    #[test]
    fn arithmetic() {
        let a = &Int32Chunked::new("a", &[1, 100, 6, 40]);
        let b = &Int32Chunked::new("b", &[-1, 2, 3, 4]);
        println!("{:?}", a + b);
        println!("{:?}", a - b);
        println!("{:?}", a * b);
        println!("{:?}", a / b);
    }
    #[test]
    fn iter() {
        let s1 = get_chunked_array();
        assert_eq!(s1.into_iter().fold(0, |acc, val| { acc + val.unwrap() }), 6)
    }
    #[test]
    fn limit() {
        let a = get_chunked_array();
        let b = a.limit(2);
        println!("{:?}", b);
        assert_eq!(b.len(), 2)
    }
    #[test]
    fn filter() {
        let a = get_chunked_array();
        let b = a
            .filter(&BooleanChunked::new("filter", &[true, false, false]))
            .unwrap();
        assert_eq!(b.len(), 1);
        assert_eq!(b.into_iter().next(), Some(Some(1)));
    }
    #[test]
    fn aggregates() {
        let a = &Int32Chunked::new("a", &[1, 100, 10, 9]);
        assert_eq!(a.max(), Some(100));
        assert_eq!(a.min(), Some(1));
        assert_eq!(a.sum(), Some(120))
    }
    #[test]
    fn take() {
        let a = get_chunked_array();
        let new = a.take(&[0 as IdxSize, 1]).unwrap();
        assert_eq!(new.len(), 2)
    }
    #[test]
    fn cast() {
        let a = get_chunked_array();
        let b = a.cast(&DataType::Int64).unwrap();
        assert_eq!(b.dtype(), &ArrowDataType::Int64)
    }
    fn assert_slice_equal<T>(ca: &ChunkedArray<T>, eq: &[T::Native])
    where
        T: PolarsNumericType,
    {
        assert_eq!(ca.iter().map(|opt| opt.unwrap()).collect::<Vec<_>>(), eq)
    }
    #[test]
    fn slice() {
        let mut first = UInt32Chunked::new("first", &[0, 1, 2]);
        let second = UInt32Chunked::new("second", &[3, 4, 5]);
        first.append(&second);
        assert_slice_equal(&first.slice(0, 3), &[0, 1, 2]);
        assert_slice_equal(&first.slice(0, 4), &[0, 1, 2, 3]);
        assert_slice_equal(&first.slice(1, 4), &[1, 2, 3, 4]);
        assert_slice_equal(&first.slice(3, 2), &[3, 4]);
        assert_slice_equal(&first.slice(3, 3), &[3, 4, 5]);
        assert_slice_equal(&first.slice(-3, 3), &[3, 4, 5]);
        assert_slice_equal(&first.slice(-6, 6), &[0, 1, 2, 3, 4, 5]);
        assert_eq!(first.slice(-7, 2).len(), 1);
        assert_eq!(first.slice(-3, 4).len(), 3);
        assert_eq!(first.slice(3, 4).len(), 3);
        assert_eq!(first.slice(10, 4).len(), 0);
    }
    #[test]
    fn sorting() {
        let s = UInt32Chunked::new("", &[9, 2, 4]);
        let sorted = s.sort(false);
        assert_slice_equal(&sorted, &[2, 4, 9]);
        let sorted = s.sort(true);
        assert_slice_equal(&sorted, &[9, 4, 2]);
        let s: StringChunked = ["b", "a", "z"].iter().collect();
        let sorted = s.sort(false);
        assert_eq!(
            sorted.into_iter().collect::<Vec<_>>(),
            &[Some("a"), Some("b"), Some("z")]
        );
        let sorted = s.sort(true);
        assert_eq!(
            sorted.into_iter().collect::<Vec<_>>(),
            &[Some("z"), Some("b"), Some("a")]
        );
        let s: StringChunked = [Some("b"), None, Some("z")].iter().copied().collect();
        let sorted = s.sort(false);
        assert_eq!(
            sorted.into_iter().collect::<Vec<_>>(),
            &[None, Some("b"), Some("z")]
        );
    }
    #[test]
    fn reverse() {
        let s = UInt32Chunked::new("", &[1, 2, 3]);
        assert_slice_equal(&s.reverse(), &[3, 2, 1]);
        let s = UInt32Chunked::new("", &[Some(1), None, Some(3)]);
        assert_eq!(Vec::from(&s.reverse()), &[Some(3), None, Some(1)]);
        let s = BooleanChunked::new("", &[true, false]);
        assert_eq!(Vec::from(&s.reverse()), &[Some(false), Some(true)]);
        let s = StringChunked::new("", &["a", "b", "c"]);
        assert_eq!(Vec::from(&s.reverse()), &[Some("c"), Some("b"), Some("a")]);
        let s = StringChunked::new("", &[Some("a"), None, Some("c")]);
        assert_eq!(Vec::from(&s.reverse()), &[Some("c"), None, Some("a")]);
    }
    #[test]
    #[cfg(feature = "dtype-categorical")]
    fn test_iter_categorical() {
        use crate::{disable_string_cache, SINGLE_LOCK};
        let _lock = SINGLE_LOCK.lock();
        disable_string_cache();
        let ca = StringChunked::new("", &[Some("foo"), None, Some("bar"), Some("ham")]);
        let ca = ca
            .cast(&DataType::Categorical(None, Default::default()))
            .unwrap();
        let ca = ca.categorical().unwrap();
        let v: Vec<_> = ca.physical().into_iter().collect();
        assert_eq!(v, &[Some(0), None, Some(1), Some(2)]);
    }
    #[test]
    #[ignore]
    fn test_shrink_to_fit() {
        let mut builder = StringChunkedBuilder::new("foo", 2048);
        builder.append_value("foo");
        let mut arr = builder.finish();
        let before = arr
            .chunks()
            .iter()
            .map(|arr| arrow::compute::aggregate::estimated_bytes_size(arr.as_ref()))
            .sum::<usize>();
        arr.shrink_to_fit();
        let after = arr
            .chunks()
            .iter()
            .map(|arr| arrow::compute::aggregate::estimated_bytes_size(arr.as_ref()))
            .sum::<usize>();
        assert!(before > after);
    }
}