use std::collections::VecDeque;
use std::ops::Deref;
use polars_core::datatypes::Field;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::{cast_columns, read_chunk, CoreReader};
use crate::csv::read::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
use crate::csv::read::parser::next_line_position;
use crate::csv::read::CsvReader;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::update_row_counts2;
use crate::RowIndex;
#[allow(clippy::too_many_arguments)]
pub(crate) fn get_file_chunks_iterator(
    offsets: &mut VecDeque<(usize, usize)>,
    last_pos: &mut usize,
    n_chunks: usize,
    chunk_size: usize,
    bytes: &[u8],
    expected_fields: usize,
    separator: u8,
    quote_char: Option<u8>,
    eol_char: u8,
) {
    for _ in 0..n_chunks {
        let search_pos = *last_pos + chunk_size;
        if search_pos >= bytes.len() {
            break;
        }
        let end_pos = match next_line_position(
            &bytes[search_pos..],
            Some(expected_fields),
            separator,
            quote_char,
            eol_char,
        ) {
            Some(pos) => search_pos + pos,
            None => {
                break;
            },
        };
        offsets.push_back((*last_pos, end_pos));
        *last_pos = end_pos;
    }
}
struct ChunkOffsetIter<'a> {
    bytes: &'a [u8],
    offsets: VecDeque<(usize, usize)>,
    last_offset: usize,
    n_chunks: usize,
    rows_per_batch: usize,
    expected_fields: usize,
    separator: u8,
    quote_char: Option<u8>,
    eol_char: u8,
}
impl<'a> Iterator for ChunkOffsetIter<'a> {
    type Item = (usize, usize);
    fn next(&mut self) -> Option<Self::Item> {
        match self.offsets.pop_front() {
            Some(offsets) => Some(offsets),
            None => {
                if self.last_offset == self.bytes.len() {
                    return None;
                }
                let bytes_first_row = if self.rows_per_batch > 1 {
                    let bytes_first_row = next_line_position(
                        &self.bytes[self.last_offset + 2..],
                        Some(self.expected_fields),
                        self.separator,
                        self.quote_char,
                        self.eol_char,
                    )
                    .unwrap_or(1);
                    bytes_first_row + 2
                } else {
                    1
                };
                get_file_chunks_iterator(
                    &mut self.offsets,
                    &mut self.last_offset,
                    self.n_chunks,
                    self.rows_per_batch * bytes_first_row,
                    self.bytes,
                    self.expected_fields,
                    self.separator,
                    self.quote_char,
                    self.eol_char,
                );
                match self.offsets.pop_front() {
                    Some(offsets) => Some(offsets),
                    None => {
                        let out = Some((self.last_offset, self.bytes.len()));
                        self.last_offset = self.bytes.len();
                        out
                    },
                }
            },
        }
    }
}
impl<'a> CoreReader<'a> {
    pub fn batched(mut self, _has_cat: bool) -> PolarsResult<BatchedCsvReader<'a>> {
        let reader_bytes = self.reader_bytes.take().unwrap();
        let bytes = reader_bytes.as_ref();
        let (bytes, starting_point_offset) =
            self.find_starting_point(bytes, self.quote_char, self.eol_char)?;
        let offset_batch_size = 16;
        let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
        let file_chunks = ChunkOffsetIter {
            bytes,
            offsets: VecDeque::with_capacity(offset_batch_size),
            last_offset: 0,
            n_chunks: offset_batch_size,
            rows_per_batch: self.chunk_size,
            expected_fields: self.schema.len(),
            separator: self.separator,
            quote_char: self.quote_char,
            eol_char: self.eol_char,
        };
        let projection = self.get_projection()?;
        #[cfg(feature = "dtype-categorical")]
        let _cat_lock = if _has_cat {
            Some(polars_core::StringCacheHolder::hold())
        } else {
            None
        };
        #[cfg(not(feature = "dtype-categorical"))]
        let _cat_lock = None;
        Ok(BatchedCsvReader {
            reader_bytes,
            chunk_size: self.chunk_size,
            file_chunks_iter: file_chunks,
            file_chunks: vec![],
            projection,
            starting_point_offset,
            row_index: self.row_index,
            comment_prefix: self.comment_prefix,
            quote_char: self.quote_char,
            eol_char: self.eol_char,
            null_values: self.null_values,
            missing_is_null: self.missing_is_null,
            to_cast: self.to_cast,
            ignore_errors: self.ignore_errors,
            truncate_ragged_lines: self.truncate_ragged_lines,
            remaining: self.n_rows.unwrap_or(usize::MAX),
            encoding: self.encoding,
            separator: self.separator,
            schema: self.schema,
            rows_read: 0,
            _cat_lock,
            decimal_comma: self.decimal_comma,
        })
    }
}
pub struct BatchedCsvReader<'a> {
    reader_bytes: ReaderBytes<'a>,
    chunk_size: usize,
    file_chunks_iter: ChunkOffsetIter<'a>,
    file_chunks: Vec<(usize, usize)>,
    projection: Vec<usize>,
    starting_point_offset: Option<usize>,
    row_index: Option<RowIndex>,
    comment_prefix: Option<CommentPrefix>,
    quote_char: Option<u8>,
    eol_char: u8,
    null_values: Option<NullValuesCompiled>,
    missing_is_null: bool,
    truncate_ragged_lines: bool,
    to_cast: Vec<Field>,
    ignore_errors: bool,
    remaining: usize,
    encoding: CsvEncoding,
    separator: u8,
    schema: SchemaRef,
    rows_read: IdxSize,
    #[cfg(feature = "dtype-categorical")]
    _cat_lock: Option<polars_core::StringCacheHolder>,
    #[cfg(not(feature = "dtype-categorical"))]
    _cat_lock: Option<u8>,
    decimal_comma: bool,
}
impl<'a> BatchedCsvReader<'a> {
    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
        if n == 0 || self.remaining == 0 {
            return Ok(None);
        }
        let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
        self.file_chunks.extend(file_chunks_iter);
        if self.file_chunks.is_empty() {
            return Ok(None);
        }
        let chunks = &self.file_chunks;
        let mut bytes = self.reader_bytes.deref();
        if let Some(pos) = self.starting_point_offset {
            bytes = &bytes[pos..];
        }
        let mut chunks = POOL.install(|| {
            chunks
                .into_par_iter()
                .copied()
                .map(|(bytes_offset_thread, stop_at_nbytes)| {
                    let mut df = read_chunk(
                        bytes,
                        self.separator,
                        self.schema.as_ref(),
                        self.ignore_errors,
                        &self.projection,
                        bytes_offset_thread,
                        self.quote_char,
                        self.eol_char,
                        self.comment_prefix.as_ref(),
                        self.chunk_size,
                        self.encoding,
                        self.null_values.as_ref(),
                        self.missing_is_null,
                        self.truncate_ragged_lines,
                        self.chunk_size,
                        stop_at_nbytes,
                        self.starting_point_offset,
                        self.decimal_comma,
                    )?;
                    cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
                    if let Some(rc) = &self.row_index {
                        df.with_row_index_mut(&rc.name, Some(rc.offset));
                    }
                    Ok(df)
                })
                .collect::<PolarsResult<Vec<_>>>()
        })?;
        self.file_chunks.clear();
        if self.row_index.is_some() {
            update_row_counts2(&mut chunks, self.rows_read)
        }
        for df in &mut chunks {
            let h = df.height();
            if self.remaining < h {
                *df = df.slice(0, self.remaining)
            };
            self.remaining = self.remaining.saturating_sub(h);
            self.rows_read += h as IdxSize;
        }
        Ok(Some(chunks))
    }
}
pub struct OwnedBatchedCsvReader {
    #[allow(dead_code)]
    schema: SchemaRef,
    batched_reader: BatchedCsvReader<'static>,
    _reader: CsvReader<Box<dyn MmapBytesReader>>,
}
impl OwnedBatchedCsvReader {
    pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
        self.batched_reader.next_batches(n)
    }
}
pub fn to_batched_owned(mut reader: CsvReader<Box<dyn MmapBytesReader>>) -> OwnedBatchedCsvReader {
    let schema = reader.get_schema().unwrap();
    let batched_reader = reader.batched_borrowed().unwrap();
    let batched_reader: BatchedCsvReader<'static> = unsafe { std::mem::transmute(batched_reader) };
    OwnedBatchedCsvReader {
        schema,
        batched_reader,
        _reader: reader,
    }
}