use std::io::Write;
pub use arrow::io::avro::avro_schema::file::Compression;
use arrow::io::avro::avro_schema::{self};
use arrow::io::avro::write;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
pub use Compression as AvroCompression;
use crate::shared::SerWriter;
#[must_use]
pub struct AvroWriter<W> {
writer: W,
compression: Option<AvroCompression>,
name: String,
}
impl<W> AvroWriter<W>
where
W: Write,
{
pub fn with_compression(mut self, compression: Option<AvroCompression>) -> Self {
self.compression = compression;
self
}
pub fn with_name(mut self, name: String) -> Self {
self.name = name;
self
}
}
impl<W> SerWriter<W> for AvroWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
Self {
writer,
compression: None,
name: "".to_string(),
}
}
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
let schema = df.schema().to_arrow(false);
let record = write::to_record(&schema, self.name.clone())?;
let mut data = vec![];
let mut compressed_block = avro_schema::file::CompressedBlock::default();
for chunk in df.iter_chunks(false) {
let mut serializers = chunk
.iter()
.zip(record.fields.iter())
.map(|(array, field)| write::new_serializer(array.as_ref(), &field.schema))
.collect::<Vec<_>>();
let mut block =
avro_schema::file::Block::new(chunk.arrays()[0].len(), std::mem::take(&mut data));
write::serialize(&mut serializers, &mut block);
let _was_compressed =
avro_schema::write::compress(&mut block, &mut compressed_block, self.compression)
.map_err(to_compute_err)?;
avro_schema::write::write_metadata(&mut self.writer, record.clone(), self.compression)
.map_err(to_compute_err)?;
avro_schema::write::write_block(&mut self.writer, &compressed_block)
.map_err(to_compute_err)?;
data = block.data;
data.clear();
compressed_block.data.clear();
compressed_block.number_of_rows = 0
}
Ok(())
}
}