1use std::ops::Range;
3
4use arrow::datatypes::ArrowSchemaRef;
5use bytes::Bytes;
6use object_store::path::Path as ObjectPath;
7use polars_core::config::{get_rg_prefetch_size, verbose};
8use polars_core::prelude::*;
9use polars_parquet::read::RowGroupMetadata;
10use polars_parquet::write::FileMetadata;
11use polars_utils::pl_str::PlSmallStr;
12use tokio::sync::Mutex;
13use tokio::sync::mpsc::{Receiver, Sender, channel};
14
15use super::mmap::ColumnStore;
16use super::predicates::read_this_row_group;
17use crate::cloud::{
18 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
19};
20use crate::parquet::metadata::FileMetadataRef;
21use crate::pl_async::get_runtime;
22use crate::predicates::ScanIOPredicate;
23
24type DownloadedRowGroup = PlHashMap<u64, Bytes>;
25type QueuePayload = (usize, DownloadedRowGroup);
26type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;
27
28pub struct ParquetObjectStore {
29 store: PolarsObjectStore,
30 path: ObjectPath,
31 length: Option<usize>,
32 metadata: Option<FileMetadataRef>,
33}
34
35impl ParquetObjectStore {
36 pub async fn from_uri(
37 uri: &str,
38 options: Option<&CloudOptions>,
39 metadata: Option<FileMetadataRef>,
40 ) -> PolarsResult<Self> {
41 let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
42 let path = object_path_from_str(&prefix)?;
43
44 Ok(ParquetObjectStore {
45 store,
46 path,
47 length: None,
48 metadata,
49 })
50 }
51
52 async fn get_ranges(&self, ranges: &mut [Range<usize>]) -> PolarsResult<PlHashMap<u64, Bytes>> {
53 self.store.get_ranges_sort(&self.path, ranges).await
54 }
55
56 async fn length(&mut self) -> PolarsResult<usize> {
58 if self.length.is_none() {
59 self.length = Some(self.store.head(&self.path).await?.size);
60 }
61 Ok(self.length.unwrap())
62 }
63
64 pub async fn num_rows(&mut self) -> PolarsResult<usize> {
66 let metadata = self.get_metadata().await?;
67 Ok(metadata.num_rows)
68 }
69
70 async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
72 let length = self.length().await?;
73 fetch_metadata(&self.store, &self.path, length).await
74 }
75
76 pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
78 if self.metadata.is_none() {
79 self.metadata = Some(Arc::new(self.fetch_metadata().await?));
80 }
81 Ok(self.metadata.as_ref().unwrap())
82 }
83}
84
85fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
86 if N <= reader.len() {
87 let (head, tail) = reader.split_at(N);
88 *reader = tail;
89 Some(head.try_into().unwrap())
90 } else {
91 None
92 }
93}
94
95fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
96 read_n(reader).map(i32::from_le_bytes)
97}
98
99pub async fn fetch_metadata(
101 store: &PolarsObjectStore,
102 path: &ObjectPath,
103 file_byte_length: usize,
104) -> PolarsResult<FileMetadata> {
105 let footer_header_bytes = store
106 .get_range(
107 path,
108 file_byte_length
109 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
110 .ok_or_else(|| {
111 polars_parquet::parquet::error::ParquetError::OutOfSpec(
112 "not enough bytes to contain parquet footer".to_string(),
113 )
114 })?..file_byte_length,
115 )
116 .await?;
117
118 let footer_byte_length: usize = {
119 let reader = &mut footer_header_bytes.as_ref();
120 let footer_byte_size = read_i32le(reader).unwrap();
121 let magic = read_n(reader).unwrap();
122 debug_assert!(reader.is_empty());
123 if magic != polars_parquet::parquet::PARQUET_MAGIC {
124 return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
125 "incorrect magic in parquet footer".to_string(),
126 )
127 .into());
128 }
129 footer_byte_size.try_into().map_err(|_| {
130 polars_parquet::parquet::error::ParquetError::OutOfSpec(
131 "negative footer byte length".to_string(),
132 )
133 })?
134 };
135
136 let footer_bytes = store
137 .get_range(
138 path,
139 file_byte_length
140 .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
141 .ok_or_else(|| {
142 polars_parquet::parquet::error::ParquetError::OutOfSpec(
143 "not enough bytes to contain parquet footer".to_string(),
144 )
145 })?..file_byte_length,
146 )
147 .await?;
148
149 Ok(polars_parquet::parquet::read::deserialize_metadata(
150 std::io::Cursor::new(footer_bytes.as_ref()),
151 footer_bytes.as_ref().len() * 2 + 1024,
155 )?)
156}
157
158async fn download_projection(
161 fields: Arc<[PlSmallStr]>,
162 row_group: RowGroupMetadata,
163 async_reader: Arc<ParquetObjectStore>,
164 sender: QueueSend,
165 rg_index: usize,
166) -> bool {
167 let async_reader = &async_reader;
168 let row_group = &row_group;
169 let fields = fields.as_ref();
170
171 let mut ranges = Vec::with_capacity(fields.len());
172 let mut offsets = Vec::with_capacity(fields.len());
173 fields.iter().for_each(|name| {
174 let iter = row_group
176 .columns_under_root_iter(name)
177 .unwrap()
178 .map(|meta| {
179 let byte_range = meta.byte_range();
180 let offset = byte_range.start;
181 let byte_range = byte_range.start as usize..byte_range.end as usize;
182 (offset, byte_range)
183 });
184
185 for (offset, range) in iter {
186 offsets.push(offset);
187 ranges.push(range);
188 }
189 });
190
191 let result = async_reader
192 .get_ranges(&mut ranges)
193 .await
194 .map(|bytes_map| (rg_index, bytes_map));
195 sender.send(result).await.is_ok()
196}
197
198async fn download_row_group(
199 rg: RowGroupMetadata,
200 async_reader: Arc<ParquetObjectStore>,
201 sender: QueueSend,
202 rg_index: usize,
203) -> bool {
204 if rg.n_columns() == 0 {
205 return true;
206 }
207
208 let mut ranges = rg
209 .byte_ranges_iter()
210 .map(|x| x.start as usize..x.end as usize)
211 .collect::<Vec<_>>();
212
213 sender
214 .send(
215 async_reader
216 .get_ranges(&mut ranges)
217 .await
218 .map(|bytes_map| (rg_index, bytes_map)),
219 )
220 .await
221 .is_ok()
222}
223
224pub struct FetchRowGroupsFromObjectStore {
225 rg_q: Arc<Mutex<Receiver<PolarsResult<QueuePayload>>>>,
226 prefetched_rg: PlHashMap<usize, DownloadedRowGroup>,
227}
228
229impl FetchRowGroupsFromObjectStore {
230 pub fn new(
231 reader: ParquetObjectStore,
232 schema: ArrowSchemaRef,
233 projection: Option<&[usize]>,
234 predicate: Option<ScanIOPredicate>,
235 row_group_range: Range<usize>,
236 row_groups: &[RowGroupMetadata],
237 ) -> PolarsResult<Self> {
238 let projected_fields: Option<Arc<[PlSmallStr]>> = projection.map(|projection| {
239 projection
240 .iter()
241 .map(|i| (schema.get_at_index(*i).as_ref().unwrap().0.clone()))
242 .collect()
243 });
244
245 let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();
246
247 let mut row_groups = if let Some(pred) = predicate.as_ref() {
248 row_group_range
249 .filter_map(|i| {
250 let rg = &row_groups[i];
251
252 let should_be_read =
253 matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));
254
255 if !should_be_read {
257 prefetched.insert(i, Default::default());
258 }
259
260 should_be_read.then(|| (i, rg.clone()))
261 })
262 .collect::<Vec<_>>()
263 } else {
264 row_groups.iter().cloned().enumerate().collect()
265 };
266 let reader = Arc::new(reader);
267 let msg_limit = get_rg_prefetch_size();
268
269 if verbose() {
270 eprintln!("POLARS ROW_GROUP PREFETCH_SIZE: {}", msg_limit)
271 }
272
273 let (snd, rcv) = channel(msg_limit);
274 let snd = Arc::new(snd);
275
276 get_runtime().spawn(async move {
277 let chunk_len = msg_limit;
278 let mut handles = Vec::with_capacity(chunk_len.clamp(0, row_groups.len()));
279 for chunk in row_groups.chunks_mut(chunk_len) {
280 for (i, rg) in chunk {
282 let rg = std::mem::take(rg);
283
284 match &projected_fields {
285 Some(projected_fields) => {
286 let handle = tokio::spawn(download_projection(
287 projected_fields.clone(),
288 rg,
289 reader.clone(),
290 snd.clone(),
291 *i,
292 ));
293 handles.push(handle)
294 },
295 None => {
296 let handle = tokio::spawn(download_row_group(
297 rg,
298 reader.clone(),
299 snd.clone(),
300 *i,
301 ));
302 handles.push(handle)
303 },
304 }
305 }
306
307 for task in handles.drain(..handles.len().saturating_sub(3)) {
309 let succeeded = task.await.unwrap();
310 if !succeeded {
311 return;
312 }
313 }
314 }
315
316 for task in handles.drain(..) {
318 let succeeded = task.await.unwrap();
319 if !succeeded {
320 return;
321 }
322 }
323 });
324
325 Ok(FetchRowGroupsFromObjectStore {
326 rg_q: Arc::new(Mutex::new(rcv)),
327 prefetched_rg: Default::default(),
328 })
329 }
330
331 pub(crate) async fn fetch_row_groups(
332 &mut self,
333 row_groups: Range<usize>,
334 ) -> PolarsResult<ColumnStore> {
335 let mut guard = self.rg_q.lock().await;
336
337 while !row_groups
338 .clone()
339 .all(|i| self.prefetched_rg.contains_key(&i))
340 {
341 let Some(fetched) = guard.recv().await else {
342 break;
343 };
344 let (rg_i, payload) = fetched?;
345
346 self.prefetched_rg.insert(rg_i, payload);
347 }
348
349 let received = row_groups
350 .flat_map(|i| self.prefetched_rg.remove(&i))
351 .flat_map(|rg| rg.into_iter())
352 .collect::<PlHashMap<_, _>>();
353
354 Ok(ColumnStore::Fetched(received))
355 }
356}