polars_io/avro/
read.rs

1use std::io::{Read, Seek};
2
3use arrow::io::avro::{self, read};
4use arrow::record_batch::RecordBatch;
5use polars_core::error::to_compute_err;
6use polars_core::prelude::*;
7
8use crate::prelude::*;
9use crate::shared::{ArrowReader, finish_reader};
10
11/// Read [Apache Avro] format into a [`DataFrame`]
12///
13/// [Apache Avro]: https://avro.apache.org
14///
15/// # Example
16/// ```
17/// use std::fs::File;
18/// use polars_core::prelude::*;
19/// use polars_io::avro::AvroReader;
20/// use polars_io::SerReader;
21///
22/// fn example() -> PolarsResult<DataFrame> {
23///     let file = File::open("file.avro").expect("file not found");
24///
25///     AvroReader::new(file)
26///             .finish()
27/// }
28/// ```
29#[must_use]
30pub struct AvroReader<R> {
31    reader: R,
32    rechunk: bool,
33    n_rows: Option<usize>,
34    columns: Option<Vec<String>>,
35    projection: Option<Vec<usize>>,
36}
37
38impl<R: Read + Seek> AvroReader<R> {
39    /// Get schema of the Avro File
40    pub fn schema(&mut self) -> PolarsResult<Schema> {
41        let schema = self.arrow_schema()?;
42        Ok(Schema::from_arrow_schema(&schema))
43    }
44
45    /// Get arrow schema of the avro File, this is faster than a polars schema.
46    pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
47        let metadata =
48            avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
49        let schema = read::infer_schema(&metadata.record)?;
50        Ok(schema)
51    }
52
53    /// Stop reading when `n` rows are read.
54    pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
55        self.n_rows = num_rows;
56        self
57    }
58
59    /// Set the reader's column projection. This counts from 0, meaning that
60    /// `vec![0, 4]` would select the 1st and 5th column.
61    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
62        self.projection = projection;
63        self
64    }
65
66    /// Columns to select/ project
67    pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
68        self.columns = columns;
69        self
70    }
71}
72
73impl<R> ArrowReader for read::Reader<R>
74where
75    R: Read + Seek,
76{
77    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
78        self.next().map_or(Ok(None), |v| v.map(Some))
79    }
80}
81
82impl<R> SerReader<R> for AvroReader<R>
83where
84    R: Read + Seek,
85{
86    fn new(reader: R) -> Self {
87        AvroReader {
88            reader,
89            rechunk: true,
90            n_rows: None,
91            columns: None,
92            projection: None,
93        }
94    }
95
96    fn set_rechunk(mut self, rechunk: bool) -> Self {
97        self.rechunk = rechunk;
98        self
99    }
100
101    fn finish(mut self) -> PolarsResult<DataFrame> {
102        let rechunk = self.rechunk;
103        let metadata =
104            avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
105        let schema = read::infer_schema(&metadata.record)?;
106
107        if let Some(columns) = &self.columns {
108            self.projection = Some(columns_to_projection(columns, &schema)?);
109        }
110
111        let (projection, projected_schema) = if let Some(projection) = self.projection {
112            let mut prj = vec![false; schema.len()];
113            for &index in projection.iter() {
114                prj[index] = true;
115            }
116            (Some(prj), apply_projection(&schema, &projection))
117        } else {
118            (None, schema.clone())
119        };
120
121        let avro_reader = avro::read::Reader::new(&mut self.reader, metadata, schema, projection);
122
123        finish_reader(
124            avro_reader,
125            rechunk,
126            self.n_rows,
127            None,
128            &projected_schema,
129            None,
130        )
131    }
132}