polars_io/parquet/read/
async_impl.rs

1//! Read parquet files in parallel from the Object Store without a third party crate.
2
3use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_core::prelude::*;
6use polars_parquet::write::FileMetadata;
7use polars_utils::plpath::PlPathRef;
8
9use crate::cloud::{
10    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
11};
12use crate::parquet::metadata::FileMetadataRef;
13
14pub struct ParquetObjectStore {
15    store: PolarsObjectStore,
16    path: ObjectPath,
17    length: Option<usize>,
18    metadata: Option<FileMetadataRef>,
19    schema: Option<ArrowSchemaRef>,
20}
21
22impl ParquetObjectStore {
23    pub async fn from_uri(
24        uri: PlPathRef<'_>,
25        options: Option<&CloudOptions>,
26        metadata: Option<FileMetadataRef>,
27    ) -> PolarsResult<Self> {
28        let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
29        let path = object_path_from_str(&prefix)?;
30
31        Ok(ParquetObjectStore {
32            store,
33            path,
34            length: None,
35            metadata,
36            schema: None,
37        })
38    }
39
40    /// Initialize the length property of the object, unless it has already been fetched.
41    async fn length(&mut self) -> PolarsResult<usize> {
42        if self.length.is_none() {
43            self.length = Some(self.store.head(&self.path).await?.size as usize);
44        }
45        Ok(self.length.unwrap())
46    }
47
48    /// Number of rows in the parquet file.
49    pub async fn num_rows(&mut self) -> PolarsResult<usize> {
50        let metadata = self.get_metadata().await?;
51        Ok(metadata.num_rows)
52    }
53
54    /// Fetch the metadata of the parquet file, do not memoize it.
55    async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
56        let length = self.length().await?;
57        fetch_metadata(&self.store, &self.path, length).await
58    }
59
60    /// Fetch and memoize the metadata of the parquet file.
61    pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
62        if self.metadata.is_none() {
63            self.metadata = Some(Arc::new(self.fetch_metadata().await?));
64        }
65        Ok(self.metadata.as_ref().unwrap())
66    }
67
68    pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
69        self.schema = Some(match self.schema.as_ref() {
70            Some(schema) => Arc::clone(schema),
71            None => {
72                let metadata = self.get_metadata().await?;
73                let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
74                Arc::new(arrow_schema)
75            },
76        });
77
78        Ok(self.schema.clone().unwrap())
79    }
80}
81
82fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
83    if N <= reader.len() {
84        let (head, tail) = reader.split_at(N);
85        *reader = tail;
86        Some(head.try_into().unwrap())
87    } else {
88        None
89    }
90}
91
92fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
93    read_n(reader).map(i32::from_le_bytes)
94}
95
96/// Asynchronously reads the files' metadata
97pub async fn fetch_metadata(
98    store: &PolarsObjectStore,
99    path: &ObjectPath,
100    file_byte_length: usize,
101) -> PolarsResult<FileMetadata> {
102    let footer_header_bytes = store
103        .get_range(
104            path,
105            file_byte_length
106                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
107                .ok_or_else(|| {
108                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
109                        "not enough bytes to contain parquet footer".to_string(),
110                    )
111                })?..file_byte_length,
112        )
113        .await?;
114
115    let footer_byte_length: usize = {
116        let reader = &mut footer_header_bytes.as_ref();
117        let footer_byte_size = read_i32le(reader).unwrap();
118        let magic = read_n(reader).unwrap();
119        debug_assert!(reader.is_empty());
120        if magic != polars_parquet::parquet::PARQUET_MAGIC {
121            return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
122                "incorrect magic in parquet footer".to_string(),
123            )
124            .into());
125        }
126        footer_byte_size.try_into().map_err(|_| {
127            polars_parquet::parquet::error::ParquetError::OutOfSpec(
128                "negative footer byte length".to_string(),
129            )
130        })?
131    };
132
133    let footer_bytes = store
134        .get_range(
135            path,
136            file_byte_length
137                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
138                .ok_or_else(|| {
139                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
140                        "not enough bytes to contain parquet footer".to_string(),
141                    )
142                })?..file_byte_length,
143        )
144        .await?;
145
146    Ok(polars_parquet::parquet::read::deserialize_metadata(
147        std::io::Cursor::new(footer_bytes.as_ref()),
148        // TODO: Describe why this makes sense. Taken from the previous
149        // implementation which said "a highly nested but sparse struct could
150        // result in many allocations".
151        footer_bytes.as_ref().len() * 2 + 1024,
152    )?)
153}