1use std::io::{Read, Seek};
2
3use arrow::io::avro::{self, read};
4use arrow::record_batch::RecordBatch;
5use polars_core::error::to_compute_err;
6use polars_core::prelude::*;
7
8use crate::prelude::*;
9use crate::shared::{ArrowReader, finish_reader};
10
11#[must_use]
30pub struct AvroReader<R> {
31 reader: R,
32 rechunk: bool,
33 n_rows: Option<usize>,
34 columns: Option<Vec<String>>,
35 projection: Option<Vec<usize>>,
36}
37
38impl<R: Read + Seek> AvroReader<R> {
39 pub fn schema(&mut self) -> PolarsResult<Schema> {
41 let schema = self.arrow_schema()?;
42 Ok(Schema::from_arrow_schema(&schema))
43 }
44
45 pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
47 let metadata =
48 avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
49 let schema = read::infer_schema(&metadata.record)?;
50 Ok(schema)
51 }
52
53 pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
55 self.n_rows = num_rows;
56 self
57 }
58
59 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
62 self.projection = projection;
63 self
64 }
65
66 pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
68 self.columns = columns;
69 self
70 }
71}
72
73impl<R> ArrowReader for read::Reader<R>
74where
75 R: Read + Seek,
76{
77 fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
78 self.next().map_or(Ok(None), |v| v.map(Some))
79 }
80}
81
82impl<R> SerReader<R> for AvroReader<R>
83where
84 R: Read + Seek,
85{
86 fn new(reader: R) -> Self {
87 AvroReader {
88 reader,
89 rechunk: true,
90 n_rows: None,
91 columns: None,
92 projection: None,
93 }
94 }
95
96 fn set_rechunk(mut self, rechunk: bool) -> Self {
97 self.rechunk = rechunk;
98 self
99 }
100
101 fn finish(mut self) -> PolarsResult<DataFrame> {
102 let rechunk = self.rechunk;
103 let metadata =
104 avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
105 let schema = read::infer_schema(&metadata.record)?;
106
107 if let Some(columns) = &self.columns {
108 self.projection = Some(columns_to_projection(columns, &schema)?);
109 }
110
111 let (projection, projected_schema) = if let Some(projection) = self.projection {
112 let mut prj = vec![false; schema.len()];
113 for &index in projection.iter() {
114 prj[index] = true;
115 }
116 (Some(prj), apply_projection(&schema, &projection))
117 } else {
118 (None, schema.clone())
119 };
120
121 let avro_reader = avro::read::Reader::new(&mut self.reader, metadata, schema, projection);
122
123 finish_reader(
124 avro_reader,
125 rechunk,
126 self.n_rows,
127 None,
128 &projected_schema,
129 None,
130 )
131 }
132}