polars_io/ipc/
ipc_reader_async.rs

1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::pl_str::PlSmallStr;
11
12use crate::RowIndex;
13use crate::cloud::{
14    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
15};
16use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
17use crate::predicates::PhysicalIoExpr;
18use crate::prelude::{IpcReader, materialize_projection};
19use crate::shared::SerReader;
20
21/// An Arrow IPC reader implemented on top of PolarsObjectStore.
22pub struct IpcReaderAsync {
23    store: PolarsObjectStore,
24    cache_entry: Arc<FileCacheEntry>,
25    path: Path,
26}
27
28#[derive(Default, Clone)]
29pub struct IpcReadOptions {
30    // Names of the columns to include in the output.
31    projection: Option<Arc<[PlSmallStr]>>,
32
33    // The maximum number of rows to include in the output.
34    row_limit: Option<usize>,
35
36    // Include a column with the row number under the provided name  starting at the provided index.
37    row_index: Option<RowIndex>,
38
39    // Only include rows that pass this predicate.
40    predicate: Option<Arc<dyn PhysicalIoExpr>>,
41}
42
43impl IpcReadOptions {
44    pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
45        self.projection = projection;
46        self
47    }
48
49    pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
50        self.row_limit = row_limit.into();
51        self
52    }
53
54    pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
55        self.row_index = row_index.into();
56        self
57    }
58
59    pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
60        self.predicate = predicate.into();
61        self
62    }
63}
64
65impl IpcReaderAsync {
66    pub async fn from_uri(
67        uri: &str,
68        cloud_options: Option<&CloudOptions>,
69    ) -> PolarsResult<IpcReaderAsync> {
70        let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone();
71        let (CloudLocation { prefix, .. }, store) =
72            build_object_store(uri, cloud_options, false).await?;
73
74        let path = object_path_from_str(&prefix)?;
75
76        Ok(Self {
77            store,
78            cache_entry,
79            path,
80        })
81    }
82
83    async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
84        self.store.head(&self.path).await
85    }
86
87    async fn file_size(&self) -> PolarsResult<usize> {
88        Ok(self.object_metadata().await?.size)
89    }
90
91    pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
92        let file_size = self.file_size().await?;
93
94        // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
95        let footer_metadata =
96            self.store
97                .get_range(
98                    &self.path,
99                    file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
100                        to_compute_err("ipc file size is smaller than the minimum")
101                    })?..file_size,
102                )
103                .await?;
104
105        let footer_size = deserialize_footer_metadata(
106            footer_metadata
107                .as_ref()
108                .try_into()
109                .map_err(to_compute_err)?,
110        )?;
111
112        let footer = self
113            .store
114            .get_range(
115                &self.path,
116                file_size
117                    .checked_sub(FOOTER_METADATA_SIZE + footer_size)
118                    .ok_or_else(|| {
119                        to_compute_err("invalid ipc footer metadata: footer size too large")
120                    })?..file_size,
121            )
122            .await?;
123
124        arrow::io::ipc::read::deserialize_footer(
125            footer.as_ref(),
126            footer_size.try_into().map_err(to_compute_err)?,
127        )
128    }
129
130    pub async fn data(
131        &self,
132        metadata: Option<&FileMetadata>,
133        options: IpcReadOptions,
134        verbose: bool,
135    ) -> PolarsResult<DataFrame> {
136        // TODO: Only download what is needed rather than the entire file by
137        // making use of the projection, row limit, predicate and such.
138        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
139        let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
140
141        let projection = match options.projection.as_deref() {
142            Some(projection) => {
143                fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
144                    if let Some(rc) = row_index {
145                        let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
146                    }
147                    schema
148                }
149
150                // Retrieve the metadata for the schema so we can map column names to indices.
151                let fetched_metadata;
152                let metadata = if let Some(metadata) = metadata {
153                    metadata
154                } else {
155                    // This branch is  happens when _metadata is None, which can happen if we Deserialize the execution plan.
156                    fetched_metadata = self.metadata().await?;
157                    &fetched_metadata
158                };
159
160                let schema = prepare_schema(
161                    Schema::from_arrow_schema(metadata.schema.as_ref()),
162                    options.row_index.as_ref(),
163                );
164
165                let hive_partitions = None;
166
167                materialize_projection(
168                    Some(projection),
169                    &schema,
170                    hive_partitions,
171                    options.row_index.is_some(),
172                )
173            },
174            None => None,
175        };
176
177        let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
178            .with_row_index(options.row_index)
179            .with_n_rows(options.row_limit)
180            .with_projection(projection);
181        reader.finish_with_scan_ops(options.predicate, verbose)
182    }
183
184    pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
185        // TODO: Only download what is needed rather than the entire file by
186        // making use of the projection, row limit, predicate and such.
187        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
188        let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
189        get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
190    }
191}
192
193const FOOTER_METADATA_SIZE: usize = 10;
194
195// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
196// sync and async readers.
197fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
198    let footer_size: usize =
199        i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
200            .try_into()
201            .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
202
203    if &bytes[4..] != b"ARROW1" {
204        polars_bail!(oos = OutOfSpecKind::InvalidFooter);
205    }
206
207    Ok(footer_size)
208}