polars_io/parquet/read/
async_impl.rs1use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_core::prelude::*;
6use polars_parquet::write::FileMetadata;
7
8use crate::cloud::{
9 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
10};
11use crate::parquet::metadata::FileMetadataRef;
12
13pub struct ParquetObjectStore {
14 store: PolarsObjectStore,
15 path: ObjectPath,
16 length: Option<usize>,
17 metadata: Option<FileMetadataRef>,
18 schema: Option<ArrowSchemaRef>,
19}
20
21impl ParquetObjectStore {
22 pub async fn from_uri(
23 uri: &str,
24 options: Option<&CloudOptions>,
25 metadata: Option<FileMetadataRef>,
26 ) -> PolarsResult<Self> {
27 let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
28 let path = object_path_from_str(&prefix)?;
29
30 Ok(ParquetObjectStore {
31 store,
32 path,
33 length: None,
34 metadata,
35 schema: None,
36 })
37 }
38
39 async fn length(&mut self) -> PolarsResult<usize> {
41 if self.length.is_none() {
42 self.length = Some(self.store.head(&self.path).await?.size as usize);
43 }
44 Ok(self.length.unwrap())
45 }
46
47 pub async fn num_rows(&mut self) -> PolarsResult<usize> {
49 let metadata = self.get_metadata().await?;
50 Ok(metadata.num_rows)
51 }
52
53 async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
55 let length = self.length().await?;
56 fetch_metadata(&self.store, &self.path, length).await
57 }
58
59 pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
61 if self.metadata.is_none() {
62 self.metadata = Some(Arc::new(self.fetch_metadata().await?));
63 }
64 Ok(self.metadata.as_ref().unwrap())
65 }
66
67 pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
68 self.schema = Some(match self.schema.as_ref() {
69 Some(schema) => Arc::clone(schema),
70 None => {
71 let metadata = self.get_metadata().await?;
72 let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
73 Arc::new(arrow_schema)
74 },
75 });
76
77 Ok(self.schema.clone().unwrap())
78 }
79}
80
81fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
82 if N <= reader.len() {
83 let (head, tail) = reader.split_at(N);
84 *reader = tail;
85 Some(head.try_into().unwrap())
86 } else {
87 None
88 }
89}
90
91fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
92 read_n(reader).map(i32::from_le_bytes)
93}
94
95pub async fn fetch_metadata(
97 store: &PolarsObjectStore,
98 path: &ObjectPath,
99 file_byte_length: usize,
100) -> PolarsResult<FileMetadata> {
101 let footer_header_bytes = store
102 .get_range(
103 path,
104 file_byte_length
105 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
106 .ok_or_else(|| {
107 polars_parquet::parquet::error::ParquetError::OutOfSpec(
108 "not enough bytes to contain parquet footer".to_string(),
109 )
110 })?..file_byte_length,
111 )
112 .await?;
113
114 let footer_byte_length: usize = {
115 let reader = &mut footer_header_bytes.as_ref();
116 let footer_byte_size = read_i32le(reader).unwrap();
117 let magic = read_n(reader).unwrap();
118 debug_assert!(reader.is_empty());
119 if magic != polars_parquet::parquet::PARQUET_MAGIC {
120 return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
121 "incorrect magic in parquet footer".to_string(),
122 )
123 .into());
124 }
125 footer_byte_size.try_into().map_err(|_| {
126 polars_parquet::parquet::error::ParquetError::OutOfSpec(
127 "negative footer byte length".to_string(),
128 )
129 })?
130 };
131
132 let footer_bytes = store
133 .get_range(
134 path,
135 file_byte_length
136 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
137 .ok_or_else(|| {
138 polars_parquet::parquet::error::ParquetError::OutOfSpec(
139 "not enough bytes to contain parquet footer".to_string(),
140 )
141 })?..file_byte_length,
142 )
143 .await?;
144
145 Ok(polars_parquet::parquet::read::deserialize_metadata(
146 std::io::Cursor::new(footer_bytes.as_ref()),
147 footer_bytes.as_ref().len() * 2 + 1024,
151 )?)
152}