use std::io::{Read, Seek};
use arrow::io::avro::{self, read};
use arrow::record_batch::RecordBatch;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader};
#[must_use]
pub struct AvroReader<R> {
    reader: R,
    rechunk: bool,
    n_rows: Option<usize>,
    columns: Option<Vec<String>>,
    projection: Option<Vec<usize>>,
}
impl<R: Read + Seek> AvroReader<R> {
    pub fn schema(&mut self) -> PolarsResult<Schema> {
        let schema = self.arrow_schema()?;
        Ok(Schema::from_iter(&schema.fields))
    }
    pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
        let metadata =
            avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
        let schema = read::infer_schema(&metadata.record)?;
        Ok(schema)
    }
    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
        self.n_rows = num_rows;
        self
    }
    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
        self.projection = projection;
        self
    }
    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
        self.columns = columns;
        self
    }
}
impl<R> ArrowReader for read::Reader<R>
where
    R: Read + Seek,
{
    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
        self.next().map_or(Ok(None), |v| v.map(Some))
    }
}
impl<R> SerReader<R> for AvroReader<R>
where
    R: Read + Seek,
{
    fn new(reader: R) -> Self {
        AvroReader {
            reader,
            rechunk: true,
            n_rows: None,
            columns: None,
            projection: None,
        }
    }
    fn set_rechunk(mut self, rechunk: bool) -> Self {
        self.rechunk = rechunk;
        self
    }
    fn finish(mut self) -> PolarsResult<DataFrame> {
        let rechunk = self.rechunk;
        let metadata =
            avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
        let schema = read::infer_schema(&metadata.record)?;
        if let Some(columns) = &self.columns {
            self.projection = Some(columns_to_projection(columns, &schema)?);
        }
        let (projection, projected_schema) = if let Some(projection) = self.projection {
            let mut prj = vec![false; schema.fields.len()];
            for &index in projection.iter() {
                prj[index] = true;
            }
            (Some(prj), apply_projection(&schema, &projection))
        } else {
            (None, schema.clone())
        };
        let avro_reader =
            avro::read::Reader::new(&mut self.reader, metadata, schema.fields, projection);
        finish_reader(
            avro_reader,
            rechunk,
            self.n_rows,
            None,
            &projected_schema,
            None,
        )
    }
}