#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use std::borrow::Cow;
use std::io::Read;
use std::path::{Path, PathBuf};
use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use regex::{Regex, RegexBuilder};
use crate::mmap::{MmapBytesReader, ReaderBytes};
pub static POLARS_TEMP_DIR_BASE_PATH: Lazy<Box<Path>> = Lazy::new(|| {
    let path = std::env::var("POLARS_TEMP_DIR")
        .map(PathBuf::from)
        .unwrap_or_else(|_| {
            PathBuf::from(std::env::temp_dir().to_string_lossy().as_ref()).join("polars/")
        })
        .into_boxed_path();
    if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
        if !path.is_dir() {
            panic!("failed to create temporary directory: {}", err);
        }
    }
    path
});
pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
    reader: &'a mut R,
) -> PolarsResult<ReaderBytes<'a>> {
    if let Some(file) = reader.to_file() {
        let mmap = unsafe { memmap::Mmap::map(file)? };
        use std::fs::File;
        let file = unsafe { std::mem::transmute::<&File, &'a File>(file) };
        Ok(ReaderBytes::Mapped(mmap, file))
    } else {
        if reader.to_bytes().is_some() {
            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
        } else {
            let mut bytes = Vec::with_capacity(1024 * 128);
            reader.read_to_end(&mut bytes)?;
            Ok(ReaderBytes::Owned(bytes))
        }
    }
}
pub fn resolve_homedir(path: &Path) -> PathBuf {
    if path.starts_with("~") {
        #[cfg(not(target_family = "wasm"))]
        if let Some(homedir) = home::home_dir() {
            return homedir.join(path.strip_prefix("~").unwrap());
        }
    }
    path.into()
}
#[cfg(any(
    feature = "ipc",
    feature = "ipc_streaming",
    feature = "parquet",
    feature = "avro"
))]
pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
    let fields = &schema.fields;
    let fields = projection
        .iter()
        .map(|idx| fields[*idx].clone())
        .collect::<Vec<_>>();
    ArrowSchema::from(fields)
}
#[cfg(any(
    feature = "ipc",
    feature = "ipc_streaming",
    feature = "avro",
    feature = "parquet"
))]
pub(crate) fn columns_to_projection(
    columns: &[String],
    schema: &ArrowSchema,
) -> PolarsResult<Vec<usize>> {
    let mut prj = Vec::with_capacity(columns.len());
    if columns.len() > 100 {
        let mut column_names = PlHashMap::with_capacity(schema.fields.len());
        schema.fields.iter().enumerate().for_each(|(i, c)| {
            column_names.insert(c.name.as_str(), i);
        });
        for column in columns.iter() {
            let Some(&i) = column_names.get(column.as_str()) else {
                polars_bail!(
                    ColumnNotFound:
                    "unable to find column {:?}; valid columns: {:?}", column, schema.get_names(),
                );
            };
            prj.push(i);
        }
    } else {
        for column in columns.iter() {
            let i = schema.try_index_of(column)?;
            prj.push(i);
        }
    }
    Ok(prj)
}
#[cfg(any(feature = "csv", feature = "json"))]
pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, IdxSize)], offset: IdxSize) {
    if !dfs.is_empty() {
        let mut previous = dfs[0].1 + offset;
        for (df, n_read) in &mut dfs[1..] {
            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
                *s = &*s + previous;
            }
            previous += *n_read;
        }
    }
}
#[cfg(any(feature = "csv", feature = "json"))]
pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
    if !dfs.is_empty() {
        let mut previous = dfs[0].height() as IdxSize + offset;
        for df in &mut dfs[1..] {
            let n_read = df.height() as IdxSize;
            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
                *s = &*s + previous;
            }
            previous += n_read;
        }
    }
}
pub fn get_sequential_row_statistics<I>(
    iter: I,
    mut total_rows_to_read: usize,
) -> Vec<(usize, usize)>
where
    I: Iterator<Item = usize>,
{
    let mut cumulative_read = 0;
    iter.map(|rows_this_file| {
        let remaining_rows_to_read = total_rows_to_read;
        total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file);
        let current_cumulative_read = cumulative_read;
        cumulative_read += rows_this_file;
        (remaining_rows_to_read, current_cumulative_read)
    })
    .collect()
}
#[cfg(feature = "json")]
pub(crate) fn overwrite_schema(
    schema: &mut Schema,
    overwriting_schema: &Schema,
) -> PolarsResult<()> {
    for (k, value) in overwriting_schema.iter() {
        *schema.try_get_mut(k)? = value.clone();
    }
    Ok(())
}
pub static FLOAT_RE: Lazy<Regex> = Lazy::new(|| {
    Regex::new(r"^[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$").unwrap()
});
pub static FLOAT_RE_DECIMAL: Lazy<Regex> = Lazy::new(|| {
    Regex::new(r"^[-+]?((\d*,\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+,)$").unwrap()
});
pub static INTEGER_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^-?(\d+)$").unwrap());
pub static BOOLEAN_RE: Lazy<Regex> = Lazy::new(|| {
    RegexBuilder::new(r"^(true|false)$")
        .case_insensitive(true)
        .build()
        .unwrap()
});
pub fn materialize_projection(
    with_columns: Option<&[String]>,
    schema: &Schema,
    hive_partitions: Option<&[Series]>,
    has_row_index: bool,
) -> Option<Vec<usize>> {
    match hive_partitions {
        None => with_columns.map(|with_columns| {
            with_columns
                .iter()
                .map(|name| schema.index_of(name).unwrap() - has_row_index as usize)
                .collect()
        }),
        Some(part_cols) => {
            with_columns.map(|with_columns| {
                with_columns
                    .iter()
                    .flat_map(|name| {
                        if part_cols.iter().any(|s| s.name() == name.as_str()) {
                            None
                        } else {
                            Some(schema.index_of(name).unwrap() - has_row_index as usize)
                        }
                    })
                    .collect()
            })
        },
    }
}
pub fn check_projected_schema_impl(
    a: &Schema,
    b: &Schema,
    projected_names: Option<&[String]>,
    msg: &str,
) -> PolarsResult<()> {
    if !projected_names
        .map(|projected_names| {
            projected_names
                .iter()
                .all(|name| a.get(name) == b.get(name))
        })
        .unwrap_or_else(|| a == b)
    {
        polars_bail!(ComputeError: "{msg}\n\n\
                    Expected: {:?}\n\n\
                    Got: {:?}", a, b)
    }
    Ok(())
}
pub fn check_projected_arrow_schema(
    a: &ArrowSchema,
    b: &ArrowSchema,
    projected_names: Option<&[String]>,
    msg: &str,
) -> PolarsResult<()> {
    if a != b {
        let a = Schema::from(a);
        let b = Schema::from(b);
        check_projected_schema_impl(&a, &b, projected_names, msg)
    } else {
        Ok(())
    }
}
pub fn check_projected_schema(
    a: &Schema,
    b: &Schema,
    projected_names: Option<&[String]>,
    msg: &str,
) -> PolarsResult<()> {
    check_projected_schema_impl(a, b, projected_names, msg)
}
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
pub(crate) fn chunk_df_for_writing(
    df: &mut DataFrame,
    row_group_size: usize,
) -> PolarsResult<Cow<DataFrame>> {
    df.align_chunks();
    if !df.get_columns().is_empty()
        && df.get_columns()[0]
            .chunk_lengths()
            .take(5)
            .all(|len| len < row_group_size)
    {
        fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
            let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
            new.as_single_chunk_par();
            new_chunks.push(new);
        }
        let mut new_chunks = Vec::with_capacity(df.n_chunks()); let mut scratch = vec![];
        let mut remaining = row_group_size;
        for df in df.split_chunks() {
            remaining = remaining.saturating_sub(df.height());
            scratch.push(df);
            if remaining == 0 {
                remaining = row_group_size;
                finish(&mut scratch, &mut new_chunks);
            }
        }
        if !scratch.is_empty() {
            finish(&mut scratch, &mut new_chunks);
        }
        return Ok(Cow::Owned(accumulate_dataframes_vertical_unchecked(
            new_chunks,
        )));
    }
    let n_splits = df.height() / row_group_size;
    let result = if n_splits > 0 {
        let mut splits = split_df_as_ref(df, n_splits, false);
        for df in splits.iter_mut() {
            let n_chunks = df.n_chunks();
            if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
                df.as_single_chunk_par();
            }
        }
        Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
    } else {
        Cow::Borrowed(df)
    };
    Ok(result)
}
static CLOUD_URL: Lazy<Regex> =
    Lazy::new(|| Regex::new(r"^(s3a?|gs|gcs|file|abfss?|azure|az|adl|https?)://").unwrap());
pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
    match p.as_ref().as_os_str().to_str() {
        Some(s) => CLOUD_URL.is_match(s),
        _ => false,
    }
}
#[cfg(test)]
mod tests {
    use std::path::PathBuf;
    use super::{resolve_homedir, FLOAT_RE};
    #[test]
    fn test_float_parse() {
        assert!(FLOAT_RE.is_match("0.1"));
        assert!(FLOAT_RE.is_match("3.0"));
        assert!(FLOAT_RE.is_match("3.00001"));
        assert!(FLOAT_RE.is_match("-9.9990e-003"));
        assert!(FLOAT_RE.is_match("9.9990e+003"));
        assert!(FLOAT_RE.is_match("9.9990E+003"));
        assert!(FLOAT_RE.is_match("9.9990E+003"));
        assert!(FLOAT_RE.is_match(".5"));
        assert!(FLOAT_RE.is_match("2.5E-10"));
        assert!(FLOAT_RE.is_match("2.5e10"));
        assert!(FLOAT_RE.is_match("NaN"));
        assert!(FLOAT_RE.is_match("-NaN"));
        assert!(FLOAT_RE.is_match("-inf"));
        assert!(FLOAT_RE.is_match("inf"));
        assert!(FLOAT_RE.is_match("-7e-05"));
        assert!(FLOAT_RE.is_match("7e-05"));
        assert!(FLOAT_RE.is_match("+7e+05"));
    }
    #[cfg(not(target_os = "windows"))]
    #[test]
    fn test_resolve_homedir() {
        let paths: Vec<PathBuf> = vec![
            "~/dir1/dir2/test.csv".into(),
            "/abs/path/test.csv".into(),
            "rel/path/test.csv".into(),
            "/".into(),
            "~".into(),
        ];
        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
        assert_eq!(resolved[0].file_name(), paths[0].file_name());
        assert!(resolved[0].is_absolute());
        assert_eq!(resolved[1], paths[1]);
        assert_eq!(resolved[2], paths[2]);
        assert_eq!(resolved[3], paths[3]);
        assert!(resolved[4].is_absolute());
    }
    #[cfg(target_os = "windows")]
    #[test]
    fn test_resolve_homedir_windows() {
        let paths: Vec<PathBuf> = vec![
            r#"c:\Users\user1\test.csv"#.into(),
            r#"~\user1\test.csv"#.into(),
            "~".into(),
        ];
        let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
        assert_eq!(resolved[0], paths[0]);
        assert_eq!(resolved[1].file_name(), paths[1].file_name());
        assert!(resolved[1].is_absolute());
        assert!(resolved[2].is_absolute());
    }
}