use std::io::{Read, Seek};
use std::path::PathBuf;
use arrow::datatypes::ArrowSchemaRef;
use arrow::io::ipc::read;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader};
use crate::RowIndex;
#[derive(Clone, Debug, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
    pub memory_map: bool,
}
#[must_use]
pub struct IpcReader<R: MmapBytesReader> {
    pub(super) reader: R,
    rechunk: bool,
    pub(super) n_rows: Option<usize>,
    pub(super) projection: Option<Vec<usize>>,
    pub(crate) columns: Option<Vec<String>>,
    pub(super) row_index: Option<RowIndex>,
    pub(super) memory_map: Option<PathBuf>,
    metadata: Option<read::FileMetadata>,
    schema: Option<ArrowSchemaRef>,
}
fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
    if let PolarsError::ComputeError(s) = &err {
        if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
            eprintln!(
                "Could not memory_map compressed IPC file, defaulting to normal read. \
                Toggle off 'memory_map' to silence this warning."
            );
            return Ok(());
        }
    }
    Err(err)
}
impl<R: MmapBytesReader> IpcReader<R> {
    fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
        if self.metadata.is_none() {
            let metadata = read::read_file_metadata(&mut self.reader)?;
            self.schema = Some(metadata.schema.clone());
            self.metadata = Some(metadata);
        }
        Ok(self.metadata.as_ref().unwrap())
    }
    pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
        self.get_metadata()?;
        Ok(self.schema.as_ref().unwrap().clone())
    }
    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
        self.n_rows = num_rows;
        self
    }
    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
        self.columns = columns;
        self
    }
    pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
        self.row_index = row_index;
        self
    }
    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
        self.projection = projection;
        self
    }
    pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
        self.memory_map = path_buf;
        self
    }
    #[cfg(feature = "lazy")]
    pub fn finish_with_scan_ops(
        mut self,
        predicate: Option<Arc<dyn PhysicalIoExpr>>,
        verbose: bool,
    ) -> PolarsResult<DataFrame> {
        if self.memory_map.is_some() && self.reader.to_file().is_some() {
            if verbose {
                eprintln!("memory map ipc file")
            }
            match self.finish_memmapped(predicate.clone()) {
                Ok(df) => return Ok(df),
                Err(err) => check_mmap_err(err)?,
            }
        }
        let rechunk = self.rechunk;
        let metadata = read::read_file_metadata(&mut self.reader)?;
        if let Some(columns) = &self.columns {
            self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
        }
        let schema = if let Some(projection) = &self.projection {
            Arc::new(apply_projection(&metadata.schema, projection))
        } else {
            metadata.schema.clone()
        };
        let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
        finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
    }
}
impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
where
    R: Read + Seek,
{
    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
        self.next().map_or(Ok(None), |v| v.map(Some))
    }
}
impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
    fn new(reader: R) -> Self {
        IpcReader {
            reader,
            rechunk: true,
            n_rows: None,
            columns: None,
            projection: None,
            row_index: None,
            memory_map: None,
            metadata: None,
            schema: None,
        }
    }
    fn set_rechunk(mut self, rechunk: bool) -> Self {
        self.rechunk = rechunk;
        self
    }
    fn finish(mut self) -> PolarsResult<DataFrame> {
        if self.memory_map.is_some() && self.reader.to_file().is_some() {
            match self.finish_memmapped(None) {
                Ok(df) => return Ok(df),
                Err(err) => check_mmap_err(err)?,
            }
        }
        let rechunk = self.rechunk;
        let metadata = read::read_file_metadata(&mut self.reader)?;
        let schema = &metadata.schema;
        if let Some(columns) = &self.columns {
            let prj = columns_to_projection(columns, schema)?;
            self.projection = Some(prj);
        }
        let schema = if let Some(projection) = &self.projection {
            Arc::new(apply_projection(&metadata.schema, projection))
        } else {
            metadata.schema.clone()
        };
        let ipc_reader =
            read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows);
        finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)
    }
}