polars_io/parquet/read/
async_impl.rs

1//! Read parquet files in parallel from the Object Store without a third party crate.
2use std::ops::Range;
3
4use arrow::datatypes::ArrowSchemaRef;
5use bytes::Bytes;
6use object_store::path::Path as ObjectPath;
7use polars_core::config::{get_rg_prefetch_size, verbose};
8use polars_core::prelude::*;
9use polars_parquet::read::RowGroupMetadata;
10use polars_parquet::write::FileMetadata;
11use polars_utils::pl_str::PlSmallStr;
12use tokio::sync::Mutex;
13use tokio::sync::mpsc::{Receiver, Sender, channel};
14
15use super::mmap::ColumnStore;
16use super::predicates::read_this_row_group;
17use crate::cloud::{
18    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
19};
20use crate::parquet::metadata::FileMetadataRef;
21use crate::pl_async::get_runtime;
22use crate::predicates::ScanIOPredicate;
23
24type DownloadedRowGroup = PlHashMap<u64, Bytes>;
25type QueuePayload = (usize, DownloadedRowGroup);
26type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;
27
28pub struct ParquetObjectStore {
29    store: PolarsObjectStore,
30    path: ObjectPath,
31    length: Option<usize>,
32    metadata: Option<FileMetadataRef>,
33}
34
35impl ParquetObjectStore {
36    pub async fn from_uri(
37        uri: &str,
38        options: Option<&CloudOptions>,
39        metadata: Option<FileMetadataRef>,
40    ) -> PolarsResult<Self> {
41        let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
42        let path = object_path_from_str(&prefix)?;
43
44        Ok(ParquetObjectStore {
45            store,
46            path,
47            length: None,
48            metadata,
49        })
50    }
51
52    async fn get_ranges(&self, ranges: &mut [Range<usize>]) -> PolarsResult<PlHashMap<u64, Bytes>> {
53        self.store.get_ranges_sort(&self.path, ranges).await
54    }
55
56    /// Initialize the length property of the object, unless it has already been fetched.
57    async fn length(&mut self) -> PolarsResult<usize> {
58        if self.length.is_none() {
59            self.length = Some(self.store.head(&self.path).await?.size);
60        }
61        Ok(self.length.unwrap())
62    }
63
64    /// Number of rows in the parquet file.
65    pub async fn num_rows(&mut self) -> PolarsResult<usize> {
66        let metadata = self.get_metadata().await?;
67        Ok(metadata.num_rows)
68    }
69
70    /// Fetch the metadata of the parquet file, do not memoize it.
71    async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
72        let length = self.length().await?;
73        fetch_metadata(&self.store, &self.path, length).await
74    }
75
76    /// Fetch and memoize the metadata of the parquet file.
77    pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
78        if self.metadata.is_none() {
79            self.metadata = Some(Arc::new(self.fetch_metadata().await?));
80        }
81        Ok(self.metadata.as_ref().unwrap())
82    }
83}
84
85fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
86    if N <= reader.len() {
87        let (head, tail) = reader.split_at(N);
88        *reader = tail;
89        Some(head.try_into().unwrap())
90    } else {
91        None
92    }
93}
94
95fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
96    read_n(reader).map(i32::from_le_bytes)
97}
98
99/// Asynchronously reads the files' metadata
100pub async fn fetch_metadata(
101    store: &PolarsObjectStore,
102    path: &ObjectPath,
103    file_byte_length: usize,
104) -> PolarsResult<FileMetadata> {
105    let footer_header_bytes = store
106        .get_range(
107            path,
108            file_byte_length
109                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
110                .ok_or_else(|| {
111                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
112                        "not enough bytes to contain parquet footer".to_string(),
113                    )
114                })?..file_byte_length,
115        )
116        .await?;
117
118    let footer_byte_length: usize = {
119        let reader = &mut footer_header_bytes.as_ref();
120        let footer_byte_size = read_i32le(reader).unwrap();
121        let magic = read_n(reader).unwrap();
122        debug_assert!(reader.is_empty());
123        if magic != polars_parquet::parquet::PARQUET_MAGIC {
124            return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
125                "incorrect magic in parquet footer".to_string(),
126            )
127            .into());
128        }
129        footer_byte_size.try_into().map_err(|_| {
130            polars_parquet::parquet::error::ParquetError::OutOfSpec(
131                "negative footer byte length".to_string(),
132            )
133        })?
134    };
135
136    let footer_bytes = store
137        .get_range(
138            path,
139            file_byte_length
140                .checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
141                .ok_or_else(|| {
142                    polars_parquet::parquet::error::ParquetError::OutOfSpec(
143                        "not enough bytes to contain parquet footer".to_string(),
144                    )
145                })?..file_byte_length,
146        )
147        .await?;
148
149    Ok(polars_parquet::parquet::read::deserialize_metadata(
150        std::io::Cursor::new(footer_bytes.as_ref()),
151        // TODO: Describe why this makes sense. Taken from the previous
152        // implementation which said "a highly nested but sparse struct could
153        // result in many allocations".
154        footer_bytes.as_ref().len() * 2 + 1024,
155    )?)
156}
157
158/// Download rowgroups for the column whose indexes are given in `projection`.
159/// We concurrently download the columns for each field.
160async fn download_projection(
161    fields: Arc<[PlSmallStr]>,
162    row_group: RowGroupMetadata,
163    async_reader: Arc<ParquetObjectStore>,
164    sender: QueueSend,
165    rg_index: usize,
166) -> bool {
167    let async_reader = &async_reader;
168    let row_group = &row_group;
169    let fields = fields.as_ref();
170
171    let mut ranges = Vec::with_capacity(fields.len());
172    let mut offsets = Vec::with_capacity(fields.len());
173    fields.iter().for_each(|name| {
174        // A single column can have multiple matches (structs).
175        let iter = row_group
176            .columns_under_root_iter(name)
177            .unwrap()
178            .map(|meta| {
179                let byte_range = meta.byte_range();
180                let offset = byte_range.start;
181                let byte_range = byte_range.start as usize..byte_range.end as usize;
182                (offset, byte_range)
183            });
184
185        for (offset, range) in iter {
186            offsets.push(offset);
187            ranges.push(range);
188        }
189    });
190
191    let result = async_reader
192        .get_ranges(&mut ranges)
193        .await
194        .map(|bytes_map| (rg_index, bytes_map));
195    sender.send(result).await.is_ok()
196}
197
198async fn download_row_group(
199    rg: RowGroupMetadata,
200    async_reader: Arc<ParquetObjectStore>,
201    sender: QueueSend,
202    rg_index: usize,
203) -> bool {
204    if rg.n_columns() == 0 {
205        return true;
206    }
207
208    let mut ranges = rg
209        .byte_ranges_iter()
210        .map(|x| x.start as usize..x.end as usize)
211        .collect::<Vec<_>>();
212
213    sender
214        .send(
215            async_reader
216                .get_ranges(&mut ranges)
217                .await
218                .map(|bytes_map| (rg_index, bytes_map)),
219        )
220        .await
221        .is_ok()
222}
223
224pub struct FetchRowGroupsFromObjectStore {
225    rg_q: Arc<Mutex<Receiver<PolarsResult<QueuePayload>>>>,
226    prefetched_rg: PlHashMap<usize, DownloadedRowGroup>,
227}
228
229impl FetchRowGroupsFromObjectStore {
230    pub fn new(
231        reader: ParquetObjectStore,
232        schema: ArrowSchemaRef,
233        projection: Option<&[usize]>,
234        predicate: Option<ScanIOPredicate>,
235        row_group_range: Range<usize>,
236        row_groups: &[RowGroupMetadata],
237    ) -> PolarsResult<Self> {
238        let projected_fields: Option<Arc<[PlSmallStr]>> = projection.map(|projection| {
239            projection
240                .iter()
241                .map(|i| (schema.get_at_index(*i).as_ref().unwrap().0.clone()))
242                .collect()
243        });
244
245        let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();
246
247        let mut row_groups = if let Some(pred) = predicate.as_ref() {
248            row_group_range
249                .filter_map(|i| {
250                    let rg = &row_groups[i];
251
252                    let should_be_read =
253                        matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));
254
255                    // Already add the row groups that will be skipped to the prefetched data.
256                    if !should_be_read {
257                        prefetched.insert(i, Default::default());
258                    }
259
260                    should_be_read.then(|| (i, rg.clone()))
261                })
262                .collect::<Vec<_>>()
263        } else {
264            row_groups.iter().cloned().enumerate().collect()
265        };
266        let reader = Arc::new(reader);
267        let msg_limit = get_rg_prefetch_size();
268
269        if verbose() {
270            eprintln!("POLARS ROW_GROUP PREFETCH_SIZE: {}", msg_limit)
271        }
272
273        let (snd, rcv) = channel(msg_limit);
274        let snd = Arc::new(snd);
275
276        get_runtime().spawn(async move {
277            let chunk_len = msg_limit;
278            let mut handles = Vec::with_capacity(chunk_len.clamp(0, row_groups.len()));
279            for chunk in row_groups.chunks_mut(chunk_len) {
280                // Start downloads concurrently
281                for (i, rg) in chunk {
282                    let rg = std::mem::take(rg);
283
284                    match &projected_fields {
285                        Some(projected_fields) => {
286                            let handle = tokio::spawn(download_projection(
287                                projected_fields.clone(),
288                                rg,
289                                reader.clone(),
290                                snd.clone(),
291                                *i,
292                            ));
293                            handles.push(handle)
294                        },
295                        None => {
296                            let handle = tokio::spawn(download_row_group(
297                                rg,
298                                reader.clone(),
299                                snd.clone(),
300                                *i,
301                            ));
302                            handles.push(handle)
303                        },
304                    }
305                }
306
307                // Wait n - 3 tasks, so we already start the next downloads earlier.
308                for task in handles.drain(..handles.len().saturating_sub(3)) {
309                    let succeeded = task.await.unwrap();
310                    if !succeeded {
311                        return;
312                    }
313                }
314            }
315
316            // Drain remaining tasks.
317            for task in handles.drain(..) {
318                let succeeded = task.await.unwrap();
319                if !succeeded {
320                    return;
321                }
322            }
323        });
324
325        Ok(FetchRowGroupsFromObjectStore {
326            rg_q: Arc::new(Mutex::new(rcv)),
327            prefetched_rg: Default::default(),
328        })
329    }
330
331    pub(crate) async fn fetch_row_groups(
332        &mut self,
333        row_groups: Range<usize>,
334    ) -> PolarsResult<ColumnStore> {
335        let mut guard = self.rg_q.lock().await;
336
337        while !row_groups
338            .clone()
339            .all(|i| self.prefetched_rg.contains_key(&i))
340        {
341            let Some(fetched) = guard.recv().await else {
342                break;
343            };
344            let (rg_i, payload) = fetched?;
345
346            self.prefetched_rg.insert(rg_i, payload);
347        }
348
349        let received = row_groups
350            .flat_map(|i| self.prefetched_rg.remove(&i))
351            .flat_map(|rg| rg.into_iter())
352            .collect::<PlHashMap<_, _>>();
353
354        Ok(ColumnStore::Fetched(received))
355    }
356}