use arrow::bitmap::MutableBitmap;
use arrow::legacy::kernels::set::set_at_nulls;
use arrow::legacy::trusted_len::FromIteratorReversed;
use arrow::legacy::utils::FromTrustedLenIterator;
use bytemuck::Zeroable;
use num_traits::{Bounded, NumCast, One, Zero};
use crate::prelude::*;
fn err_fill_null() -> PolarsError {
    polars_err!(ComputeError: "could not determine the fill value")
}
impl Series {
    pub fn fill_null(&self, strategy: FillNullStrategy) -> PolarsResult<Series> {
        let logical_type = self.dtype();
        let s = self.to_physical_repr();
        use DataType::*;
        let out = match s.dtype() {
            Boolean => fill_null_bool(s.bool().unwrap(), strategy),
            String => {
                let s = unsafe { s.cast_unchecked(&Binary)? };
                let out = s.fill_null(strategy)?;
                return unsafe { out.cast_unchecked(&String) };
            },
            Binary => {
                let ca = s.binary().unwrap();
                fill_null_binary(ca, strategy).map(|ca| ca.into_series())
            },
            List(_) => {
                let ca = s.list().unwrap();
                fill_null_list(ca, strategy).map(|ca| ca.into_series())
            },
            dt if dt.is_numeric() => {
                with_match_physical_numeric_polars_type!(dt, |$T| {
                    let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
                        fill_null_numeric(ca, strategy).map(|ca| ca.into_series())
                })
            },
            _ => todo!(),
        }?;
        unsafe { out.cast_unchecked(logical_type) }
    }
}
trait LocalCopy {
    fn cheap_clone(&self) -> Self;
}
impl<T: Copy> LocalCopy for T {
    #[inline]
    fn cheap_clone(&self) -> Self {
        *self
    }
}
impl LocalCopy for Series {
    #[inline]
    fn cheap_clone(&self) -> Self {
        self.clone()
    }
}
fn fill_forward_limit<'a, T, K, I>(ca: &'a ChunkedArray<T>, limit: IdxSize) -> ChunkedArray<T>
where
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    K: LocalCopy,
    I: TrustedLen<Item = Option<K>>,
    T: PolarsDataType,
    ChunkedArray<T>: FromTrustedLenIterator<Option<K>>,
{
    let mut cnt = 0;
    let mut previous = None;
    ca.into_iter()
        .map(|opt_v| match opt_v {
            Some(v) => {
                cnt = 0;
                previous = Some(v.cheap_clone());
                Some(v)
            },
            None => {
                if cnt < limit {
                    cnt += 1;
                    previous.as_ref().map(|v| v.cheap_clone())
                } else {
                    None
                }
            },
        })
        .collect_trusted()
}
fn fill_backward_limit<'a, T, K, I>(ca: &'a ChunkedArray<T>, limit: IdxSize) -> ChunkedArray<T>
where
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    K: LocalCopy,
    I: TrustedLen<Item = Option<K>> + DoubleEndedIterator,
    T: PolarsDataType,
    ChunkedArray<T>: FromIteratorReversed<Option<K>>,
{
    let mut cnt = 0;
    let mut previous = None;
    ca.into_iter()
        .rev()
        .map(|opt_v| match opt_v {
            Some(v) => {
                cnt = 0;
                previous = Some(v.cheap_clone());
                Some(v)
            },
            None => {
                if cnt < limit {
                    cnt += 1;
                    previous.as_ref().map(|v| v.cheap_clone())
                } else {
                    None
                }
            },
        })
        .collect_reversed()
}
fn fill_backward_limit_binary(ca: &BinaryChunked, limit: IdxSize) -> BinaryChunked {
    let mut cnt = 0;
    let mut previous = None;
    let out: BinaryChunked = ca
        .into_iter()
        .rev()
        .map(|opt_v| match opt_v {
            Some(v) => {
                cnt = 0;
                previous = Some(v);
                Some(v)
            },
            None => {
                if cnt < limit {
                    cnt += 1;
                    previous
                } else {
                    None
                }
            },
        })
        .collect_trusted();
    out.into_iter().rev().collect_trusted()
}
fn fill_forward<'a, T, K, I>(ca: &'a ChunkedArray<T>) -> ChunkedArray<T>
where
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    K: LocalCopy,
    I: TrustedLen<Item = Option<K>>,
    T: PolarsDataType,
    ChunkedArray<T>: FromTrustedLenIterator<Option<K>>,
{
    ca.into_iter()
        .scan(None, |previous, opt_v| match opt_v {
            Some(value) => {
                *previous = Some(value.cheap_clone());
                Some(Some(value))
            },
            None => Some(previous.as_ref().map(|v| v.cheap_clone())),
        })
        .collect_trusted()
}
fn fill_backward<'a, T, K, I>(ca: &'a ChunkedArray<T>) -> ChunkedArray<T>
where
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    K: Copy,
    I: TrustedLen<Item = Option<K>> + DoubleEndedIterator,
    T: PolarsDataType,
    ChunkedArray<T>: FromIteratorReversed<Option<K>>,
{
    ca.into_iter()
        .rev()
        .scan(None, |previous, opt_v| match opt_v {
            Some(value) => {
                *previous = Some(value);
                Some(Some(value))
            },
            None => Some(*previous),
        })
        .collect_reversed()
}
macro_rules! impl_fill_backward {
    ($ca:ident, $ChunkedArray:ty) => {{
        let ca: $ChunkedArray = $ca
            .into_iter()
            .rev()
            .scan(None, |previous, opt_v| match opt_v {
                Some(value) => {
                    *previous = Some(value.cheap_clone());
                    Some(Some(value))
                },
                None => Some(previous.as_ref().map(|s| s.cheap_clone())),
            })
            .collect_trusted();
        ca.into_iter().rev().collect_trusted()
    }};
}
macro_rules! impl_fill_backward_limit {
    ($ca:ident, $ChunkedArray:ty, $limit:expr) => {{
        let mut cnt = 0;
        let mut previous = None;
        let out: $ChunkedArray = $ca
            .into_iter()
            .rev()
            .map(|opt_v| match opt_v {
                Some(v) => {
                    cnt = 0;
                    previous = Some(v.cheap_clone());
                    Some(v)
                },
                None => {
                    if cnt < $limit {
                        cnt += 1;
                        previous.as_ref().map(|s| s.cheap_clone())
                    } else {
                        None
                    }
                },
            })
            .collect_trusted();
        out.into_iter().rev().collect_trusted()
    }};
}
fn fill_forward_numeric<'a, T, I>(ca: &'a ChunkedArray<T>) -> ChunkedArray<T>
where
    T: PolarsDataType,
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    I: TrustedLen + Iterator<Item = Option<T::Physical<'a>>>,
    T::ZeroablePhysical<'a>: LocalCopy,
{
    let values: Vec<T::ZeroablePhysical<'a>> = ca
        .into_iter()
        .scan(T::ZeroablePhysical::zeroed(), |prev, v| {
            *prev = v.map(|v| v.into()).unwrap_or(prev.cheap_clone());
            Some(prev.cheap_clone())
        })
        .collect_trusted();
    let num_start_nulls = ca.first_non_null().unwrap_or(ca.len());
    let mut bm = MutableBitmap::with_capacity(ca.len());
    bm.extend_constant(num_start_nulls, false);
    bm.extend_constant(ca.len() - num_start_nulls, true);
    ChunkedArray::from_chunk_iter_like(
        ca,
        [
            T::Array::from_zeroable_vec(values, ca.dtype().to_arrow(true))
                .with_validity_typed(Some(bm.into())),
        ],
    )
}
fn fill_backward_numeric<'a, T, I>(ca: &'a ChunkedArray<T>) -> ChunkedArray<T>
where
    T: PolarsDataType,
    &'a ChunkedArray<T>: IntoIterator<IntoIter = I>,
    I: TrustedLen + Iterator<Item = Option<T::Physical<'a>>> + DoubleEndedIterator,
    T::ZeroablePhysical<'a>: LocalCopy,
{
    let values: Vec<T::ZeroablePhysical<'a>> = ca
        .into_iter()
        .rev()
        .scan(T::ZeroablePhysical::zeroed(), |prev, v| {
            *prev = v.map(|v| v.into()).unwrap_or(prev.cheap_clone());
            Some(prev.cheap_clone())
        })
        .collect_reversed();
    let num_end_nulls = ca
        .last_non_null()
        .map(|i| ca.len() - 1 - i)
        .unwrap_or(ca.len());
    let mut bm = MutableBitmap::with_capacity(ca.len());
    bm.extend_constant(ca.len() - num_end_nulls, true);
    bm.extend_constant(num_end_nulls, false);
    ChunkedArray::from_chunk_iter_like(
        ca,
        [
            T::Array::from_zeroable_vec(values, ca.dtype().to_arrow(true))
                .with_validity_typed(Some(bm.into())),
        ],
    )
}
fn fill_null_numeric<T>(
    ca: &ChunkedArray<T>,
    strategy: FillNullStrategy,
) -> PolarsResult<ChunkedArray<T>>
where
    T: PolarsNumericType,
    ChunkedArray<T>: ChunkAgg<T::Native>,
{
    if ca.null_count() == 0 {
        return Ok(ca.clone());
    }
    let mut out = match strategy {
        FillNullStrategy::Forward(None) => fill_forward_numeric(ca),
        FillNullStrategy::Forward(Some(limit)) => fill_forward_limit(ca, limit),
        FillNullStrategy::Backward(None) => fill_backward_numeric(ca),
        FillNullStrategy::Backward(Some(limit)) => fill_backward_limit(ca, limit),
        FillNullStrategy::Min => {
            ca.fill_null_with_values(ChunkAgg::min(ca).ok_or_else(err_fill_null)?)?
        },
        FillNullStrategy::Max => {
            ca.fill_null_with_values(ChunkAgg::max(ca).ok_or_else(err_fill_null)?)?
        },
        FillNullStrategy::Mean => ca.fill_null_with_values(
            ca.mean()
                .map(|v| NumCast::from(v).unwrap())
                .ok_or_else(err_fill_null)?,
        )?,
        FillNullStrategy::One => return ca.fill_null_with_values(One::one()),
        FillNullStrategy::Zero => return ca.fill_null_with_values(Zero::zero()),
        FillNullStrategy::MinBound => return ca.fill_null_with_values(Bounded::min_value()),
        FillNullStrategy::MaxBound => return ca.fill_null_with_values(Bounded::max_value()),
    };
    out.rename(ca.name());
    Ok(out)
}
fn fill_null_bool(ca: &BooleanChunked, strategy: FillNullStrategy) -> PolarsResult<Series> {
    if ca.null_count() == 0 {
        return Ok(ca.clone().into_series());
    }
    match strategy {
        FillNullStrategy::Forward(limit) => {
            let mut out: BooleanChunked = match limit {
                Some(limit) => fill_forward_limit(ca, limit),
                None => fill_forward(ca),
            };
            out.rename(ca.name());
            Ok(out.into_series())
        },
        FillNullStrategy::Backward(limit) => {
            let mut out: BooleanChunked = match limit {
                None => fill_backward(ca),
                Some(limit) => fill_backward_limit(ca, limit),
            };
            out.rename(ca.name());
            Ok(out.into_series())
        },
        FillNullStrategy::Min => ca
            .fill_null_with_values(ca.min().ok_or_else(err_fill_null)?)
            .map(|ca| ca.into_series()),
        FillNullStrategy::Max => ca
            .fill_null_with_values(ca.max().ok_or_else(err_fill_null)?)
            .map(|ca| ca.into_series()),
        FillNullStrategy::Mean => polars_bail!(opq = mean, "Boolean"),
        FillNullStrategy::One | FillNullStrategy::MaxBound => {
            ca.fill_null_with_values(true).map(|ca| ca.into_series())
        },
        FillNullStrategy::Zero | FillNullStrategy::MinBound => {
            ca.fill_null_with_values(false).map(|ca| ca.into_series())
        },
    }
}
fn fill_null_binary(ca: &BinaryChunked, strategy: FillNullStrategy) -> PolarsResult<BinaryChunked> {
    if ca.null_count() == 0 {
        return Ok(ca.clone());
    }
    match strategy {
        FillNullStrategy::Forward(limit) => {
            let mut out: BinaryChunked = match limit {
                Some(limit) => fill_forward_limit(ca, limit),
                None => fill_forward(ca),
            };
            out.rename(ca.name());
            Ok(out)
        },
        FillNullStrategy::Backward(limit) => {
            let mut out = match limit {
                None => impl_fill_backward!(ca, BinaryChunked),
                Some(limit) => fill_backward_limit_binary(ca, limit),
            };
            out.rename(ca.name());
            Ok(out)
        },
        FillNullStrategy::Min => {
            ca.fill_null_with_values(ca.min_binary().ok_or_else(err_fill_null)?)
        },
        FillNullStrategy::Max => {
            ca.fill_null_with_values(ca.max_binary().ok_or_else(err_fill_null)?)
        },
        FillNullStrategy::Zero => ca.fill_null_with_values(&[]),
        strat => polars_bail!(InvalidOperation: "fill-null strategy {:?} is not supported", strat),
    }
}
fn fill_null_list(ca: &ListChunked, strategy: FillNullStrategy) -> PolarsResult<ListChunked> {
    if ca.null_count() == 0 {
        return Ok(ca.clone());
    }
    match strategy {
        FillNullStrategy::Forward(limit) => {
            let mut out: ListChunked = match limit {
                Some(limit) => fill_forward_limit(ca, limit),
                None => fill_forward(ca),
            };
            out.rename(ca.name());
            Ok(out)
        },
        FillNullStrategy::Backward(limit) => {
            let mut out: ListChunked = match limit {
                None => impl_fill_backward!(ca, ListChunked),
                Some(limit) => impl_fill_backward_limit!(ca, ListChunked, limit),
            };
            out.rename(ca.name());
            Ok(out)
        },
        strat => polars_bail!(InvalidOperation: "fill-null strategy {:?} is not supported", strat),
    }
}
impl<T> ChunkFillNullValue<T::Native> for ChunkedArray<T>
where
    T: PolarsNumericType,
{
    fn fill_null_with_values(&self, value: T::Native) -> PolarsResult<Self> {
        Ok(self.apply_kernel(&|arr| Box::new(set_at_nulls(arr, value))))
    }
}
impl ChunkFillNullValue<bool> for BooleanChunked {
    fn fill_null_with_values(&self, value: bool) -> PolarsResult<Self> {
        self.set(&self.is_null(), Some(value))
    }
}
impl ChunkFillNullValue<&[u8]> for BinaryChunked {
    fn fill_null_with_values(&self, value: &[u8]) -> PolarsResult<Self> {
        self.set(&self.is_null(), Some(value))
    }
}