polars_io/parquet/read/
async_impl.rs1use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_core::prelude::*;
6use polars_parquet::write::FileMetadata;
7use polars_utils::plpath::PlPathRef;
8
9use crate::cloud::{
10 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
11};
12use crate::parquet::metadata::FileMetadataRef;
13
14pub struct ParquetObjectStore {
15 store: PolarsObjectStore,
16 path: ObjectPath,
17 length: Option<usize>,
18 metadata: Option<FileMetadataRef>,
19 schema: Option<ArrowSchemaRef>,
20}
21
22impl ParquetObjectStore {
23 pub async fn from_uri(
24 uri: PlPathRef<'_>,
25 options: Option<&CloudOptions>,
26 metadata: Option<FileMetadataRef>,
27 ) -> PolarsResult<Self> {
28 let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
29 let path = object_path_from_str(&prefix)?;
30
31 Ok(ParquetObjectStore {
32 store,
33 path,
34 length: None,
35 metadata,
36 schema: None,
37 })
38 }
39
40 async fn length(&mut self) -> PolarsResult<usize> {
42 if self.length.is_none() {
43 self.length = Some(self.store.head(&self.path).await?.size as usize);
44 }
45 Ok(self.length.unwrap())
46 }
47
48 pub async fn num_rows(&mut self) -> PolarsResult<usize> {
50 let metadata = self.get_metadata().await?;
51 Ok(metadata.num_rows)
52 }
53
54 async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
56 let length = self.length().await?;
57 fetch_metadata(&self.store, &self.path, length).await
58 }
59
60 pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
62 if self.metadata.is_none() {
63 self.metadata = Some(Arc::new(self.fetch_metadata().await?));
64 }
65 Ok(self.metadata.as_ref().unwrap())
66 }
67
68 pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
69 self.schema = Some(match self.schema.as_ref() {
70 Some(schema) => Arc::clone(schema),
71 None => {
72 let metadata = self.get_metadata().await?;
73 let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
74 Arc::new(arrow_schema)
75 },
76 });
77
78 Ok(self.schema.clone().unwrap())
79 }
80}
81
82fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
83 if N <= reader.len() {
84 let (head, tail) = reader.split_at(N);
85 *reader = tail;
86 Some(head.try_into().unwrap())
87 } else {
88 None
89 }
90}
91
92fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
93 read_n(reader).map(i32::from_le_bytes)
94}
95
96pub async fn fetch_metadata(
98 store: &PolarsObjectStore,
99 path: &ObjectPath,
100 file_byte_length: usize,
101) -> PolarsResult<FileMetadata> {
102 let footer_header_bytes = store
103 .get_range(
104 path,
105 file_byte_length
106 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
107 .ok_or_else(|| {
108 polars_parquet::parquet::error::ParquetError::OutOfSpec(
109 "not enough bytes to contain parquet footer".to_string(),
110 )
111 })?..file_byte_length,
112 )
113 .await?;
114
115 let footer_byte_length: usize = {
116 let reader = &mut footer_header_bytes.as_ref();
117 let footer_byte_size = read_i32le(reader).unwrap();
118 let magic = read_n(reader).unwrap();
119 debug_assert!(reader.is_empty());
120 if magic != polars_parquet::parquet::PARQUET_MAGIC {
121 return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
122 "incorrect magic in parquet footer".to_string(),
123 )
124 .into());
125 }
126 footer_byte_size.try_into().map_err(|_| {
127 polars_parquet::parquet::error::ParquetError::OutOfSpec(
128 "negative footer byte length".to_string(),
129 )
130 })?
131 };
132
133 let footer_bytes = store
134 .get_range(
135 path,
136 file_byte_length
137 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
138 .ok_or_else(|| {
139 polars_parquet::parquet::error::ParquetError::OutOfSpec(
140 "not enough bytes to contain parquet footer".to_string(),
141 )
142 })?..file_byte_length,
143 )
144 .await?;
145
146 Ok(polars_parquet::parquet::read::deserialize_metadata(
147 std::io::Cursor::new(footer_bytes.as_ref()),
148 footer_bytes.as_ref().len() * 2 + 1024,
152 )?)
153}