use std::io::{Read, Seek};
use arrow::io::avro::{self, read};
use arrow::record_batch::RecordBatch;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader};
#[must_use]
pub struct AvroReader<R> {
reader: R,
rechunk: bool,
n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
}
impl<R: Read + Seek> AvroReader<R> {
pub fn schema(&mut self) -> PolarsResult<Schema> {
let schema = self.arrow_schema()?;
Ok(Schema::from_iter(&schema.fields))
}
pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
let metadata =
avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
let schema = read::infer_schema(&metadata.record)?;
Ok(schema)
}
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}
}
impl<R> ArrowReader for read::Reader<R>
where
R: Read + Seek,
{
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
self.next().map_or(Ok(None), |v| v.map(Some))
}
}
impl<R> SerReader<R> for AvroReader<R>
where
R: Read + Seek,
{
fn new(reader: R) -> Self {
AvroReader {
reader,
rechunk: true,
n_rows: None,
columns: None,
projection: None,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let metadata =
avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
let schema = read::infer_schema(&metadata.record)?;
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &schema)?);
}
let (projection, projected_schema) = if let Some(projection) = self.projection {
let mut prj = vec![false; schema.fields.len()];
for &index in projection.iter() {
prj[index] = true;
}
(Some(prj), apply_projection(&schema, &projection))
} else {
(None, schema.clone())
};
let avro_reader =
avro::read::Reader::new(&mut self.reader, metadata, schema.fields, projection);
finish_reader(
avro_reader,
rechunk,
self.n_rows,
None,
&projected_schema,
None,
)
}
}