polars_io/ipc/
ipc_reader_async.rs

1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_str::PlSmallStr;
12use polars_utils::plpath::PlPathRef;
13
14use crate::RowIndex;
15use crate::cloud::{
16    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
17};
18use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
19use crate::predicates::PhysicalIoExpr;
20use crate::prelude::{IpcReader, materialize_projection};
21use crate::shared::SerReader;
22
23/// An Arrow IPC reader implemented on top of PolarsObjectStore.
24pub struct IpcReaderAsync {
25    store: PolarsObjectStore,
26    cache_entry: Arc<FileCacheEntry>,
27    path: Path,
28}
29
30#[derive(Default, Clone)]
31pub struct IpcReadOptions {
32    // Names of the columns to include in the output.
33    projection: Option<Arc<[PlSmallStr]>>,
34
35    // The maximum number of rows to include in the output.
36    row_limit: Option<usize>,
37
38    // Include a column with the row number under the provided name  starting at the provided index.
39    row_index: Option<RowIndex>,
40
41    // Only include rows that pass this predicate.
42    predicate: Option<Arc<dyn PhysicalIoExpr>>,
43}
44
45impl IpcReadOptions {
46    pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
47        self.projection = projection;
48        self
49    }
50
51    pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
52        self.row_limit = row_limit.into();
53        self
54    }
55
56    pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
57        self.row_index = row_index.into();
58        self
59    }
60
61    pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
62        self.predicate = predicate.into();
63        self
64    }
65}
66
67impl IpcReaderAsync {
68    pub async fn from_uri(
69        uri: PlPathRef<'_>,
70        cloud_options: Option<&CloudOptions>,
71    ) -> PolarsResult<IpcReaderAsync> {
72        let cache_entry =
73            init_entries_from_uri_list([Arc::from(uri.to_str())].into_iter(), cloud_options)?[0]
74                .clone();
75        let (CloudLocation { prefix, .. }, store) =
76            build_object_store(uri, cloud_options, false).await?;
77
78        let path = object_path_from_str(&prefix)?;
79
80        Ok(Self {
81            store,
82            cache_entry,
83            path,
84        })
85    }
86
87    async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
88        self.store.head(&self.path).await
89    }
90
91    async fn file_size(&self) -> PolarsResult<usize> {
92        Ok(self.object_metadata().await?.size as usize)
93    }
94
95    pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
96        let file_size = self.file_size().await?;
97
98        // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
99        let footer_metadata =
100            self.store
101                .get_range(
102                    &self.path,
103                    file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
104                        to_compute_err("ipc file size is smaller than the minimum")
105                    })?..file_size,
106                )
107                .await?;
108
109        let footer_size = deserialize_footer_metadata(
110            footer_metadata
111                .as_ref()
112                .try_into()
113                .map_err(to_compute_err)?,
114        )?;
115
116        let footer = self
117            .store
118            .get_range(
119                &self.path,
120                file_size
121                    .checked_sub(FOOTER_METADATA_SIZE + footer_size)
122                    .ok_or_else(|| {
123                        to_compute_err("invalid ipc footer metadata: footer size too large")
124                    })?..file_size,
125            )
126            .await?;
127
128        arrow::io::ipc::read::deserialize_footer(
129            footer.as_ref(),
130            footer_size.try_into().map_err(to_compute_err)?,
131        )
132    }
133
134    pub async fn data(
135        &self,
136        metadata: Option<&FileMetadata>,
137        options: IpcReadOptions,
138        verbose: bool,
139    ) -> PolarsResult<DataFrame> {
140        // TODO: Only download what is needed rather than the entire file by
141        // making use of the projection, row limit, predicate and such.
142        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
143        let bytes = MMapSemaphore::new_from_file(&file).unwrap();
144
145        let projection = match options.projection.as_deref() {
146            Some(projection) => {
147                fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
148                    if let Some(rc) = row_index {
149                        let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
150                    }
151                    schema
152                }
153
154                // Retrieve the metadata for the schema so we can map column names to indices.
155                let fetched_metadata;
156                let metadata = if let Some(metadata) = metadata {
157                    metadata
158                } else {
159                    // This branch is  happens when _metadata is None, which can happen if we Deserialize the execution plan.
160                    fetched_metadata = self.metadata().await?;
161                    &fetched_metadata
162                };
163
164                let schema = prepare_schema(
165                    Schema::from_arrow_schema(metadata.schema.as_ref()),
166                    options.row_index.as_ref(),
167                );
168
169                let hive_partitions = None;
170
171                materialize_projection(
172                    Some(projection),
173                    &schema,
174                    hive_partitions,
175                    options.row_index.is_some(),
176                )
177            },
178            None => None,
179        };
180
181        let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
182            .with_row_index(options.row_index)
183            .with_n_rows(options.row_limit)
184            .with_projection(projection);
185        reader.finish_with_scan_ops(options.predicate, verbose)
186    }
187
188    pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
189        // TODO: Only download what is needed rather than the entire file by
190        // making use of the projection, row limit, predicate and such.
191        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
192        let bytes = MMapSemaphore::new_from_file(&file).unwrap();
193        get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
194    }
195}
196
197const FOOTER_METADATA_SIZE: usize = 10;
198
199// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
200// sync and async readers.
201fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
202    let footer_size: usize =
203        i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
204            .try_into()
205            .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
206
207    if &bytes[4..] != b"ARROW1" {
208        polars_bail!(oos = OutOfSpecKind::InvalidFooter);
209    }
210
211    Ok(footer_size)
212}