polars_io/parquet/read/
async_impl.rs

1//! Read parquet files in parallel from the Object Store without a third party crate.
2
3use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_core::prelude::*;
6use polars_parquet::write::FileMetadata;
7
8use crate::cloud::{
9    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
10};
11use crate::parquet::metadata::FileMetadataRef;
12
13pub struct ParquetObjectStore {
14    store: PolarsObjectStore,
15    path: ObjectPath,
16    length: Option<usize>,
17    metadata: Option<FileMetadataRef>,
18    schema: Option<ArrowSchemaRef>,
19}
20
21impl ParquetObjectStore {
22    pub async fn from_uri(
23        uri: &str,
24        options: Option<&CloudOptions>,
25        metadata: Option<FileMetadataRef>,
26    ) -> PolarsResult<Self> {
27        let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
28        let path = object_path_from_str(&prefix)?;
29
30        Ok(ParquetObjectStore {
31            store,
32            path,
33            length: None,
34            metadata,
35            schema: None,
36        })
37    }
38
39    /// Initialize the length property of the object, unless it has already been fetched.
40    async fn length(&mut self) -> PolarsResult<usize> {
41        if self.length.is_none() {
42            self.length = Some(self.store.head(&self.path).await?.size as usize);
43        }
44        Ok(self.length.unwrap())
45    }
46
47    /// Number of rows in the parquet file.
48    pub async fn num_rows(&mut self) -> PolarsResult<usize> {
49        let metadata = self.get_metadata().await?;
50        Ok(metadata.num_rows)
51    }
52
53    /// Fetch the metadata of the parquet file, do not memoize it.
54    async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
55        let length = self.length().await?;
56        fetch_metadata(&self.store, &self.path, length).await
57    }
58
59    /// Fetch and memoize the metadata of the parquet file.
60    pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
61        if self.metadata.is_none() {
62            self.metadata = Some(Arc::new(self.fetch_metadata().await?));
63        }
64        Ok(self.metadata.as_ref().unwrap())
65    }
66
67    pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
68        self.schema = Some(match self.schema.as_ref() {
69            Some(schema) => Arc::clone(schema),
70            None => {
71                let metadata = self.get_metadata().await?;
72                let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
73                Arc::new(arrow_schema)
74            },
75        });
76
77        Ok(self.schema.clone().unwrap())
78    }
79}
80
81fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
82    if N <= reader.len() {
83        let (head, tail) = reader.split_at(N);
84        *reader = tail;
85        Some(head.try_into().unwrap())
86    } else {
87        None
88    }
89}
90
91fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
92    read_n(reader).map(i32::from_le_bytes)
93}
94
95/// Asynchronously reads the files' metadata
96pub async fn fetch_metadata(
97    store: &PolarsObjectStore,
98    path: &ObjectPath,
99    file_byte_length: usize,
100) -> PolarsResult<FileMetadata> {
101    let footer_header_bytes = store
102        .get_range(
103            path,
104            file_byte_length
105                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
106                .ok_or_else(|| {
107                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
108                        "not enough bytes to contain parquet footer".to_string(),
109                    )
110                })?..file_byte_length,
111        )
112        .await?;
113
114    let footer_byte_length: usize = {
115        let reader = &mut footer_header_bytes.as_ref();
116        let footer_byte_size = read_i32le(reader).unwrap();
117        let magic = read_n(reader).unwrap();
118        debug_assert!(reader.is_empty());
119        if magic != polars_parquet::parquet::PARQUET_MAGIC {
120            return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
121                "incorrect magic in parquet footer".to_string(),
122            )
123            .into());
124        }
125        footer_byte_size.try_into().map_err(|_| {
126            polars_parquet::parquet::error::ParquetError::OutOfSpec(
127                "negative footer byte length".to_string(),
128            )
129        })?
130    };
131
132    let footer_bytes = store
133        .get_range(
134            path,
135            file_byte_length
136                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
137                .ok_or_else(|| {
138                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
139                        "not enough bytes to contain parquet footer".to_string(),
140                    )
141                })?..file_byte_length,
142        )
143        .await?;
144
145    Ok(polars_parquet::parquet::read::deserialize_metadata(
146        std::io::Cursor::new(footer_bytes.as_ref()),
147        // TODO: Describe why this makes sense. Taken from the previous
148        // implementation which said "a highly nested but sparse struct could
149        // result in many allocations".
150        footer_bytes.as_ref().len() * 2 + 1024,
151    )?)
152}