polars_io/ipc/
ipc_reader_async.rs

1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::RowIndex;
14use crate::cloud::{
15    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
18use crate::predicates::PhysicalIoExpr;
19use crate::prelude::{IpcReader, materialize_projection};
20use crate::shared::SerReader;
21
22/// An Arrow IPC reader implemented on top of PolarsObjectStore.
23pub struct IpcReaderAsync {
24    store: PolarsObjectStore,
25    cache_entry: Arc<FileCacheEntry>,
26    path: Path,
27}
28
29#[derive(Default, Clone)]
30pub struct IpcReadOptions {
31    // Names of the columns to include in the output.
32    projection: Option<Arc<[PlSmallStr]>>,
33
34    // The maximum number of rows to include in the output.
35    row_limit: Option<usize>,
36
37    // Include a column with the row number under the provided name  starting at the provided index.
38    row_index: Option<RowIndex>,
39
40    // Only include rows that pass this predicate.
41    predicate: Option<Arc<dyn PhysicalIoExpr>>,
42}
43
44impl IpcReadOptions {
45    pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
46        self.projection = projection;
47        self
48    }
49
50    pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
51        self.row_limit = row_limit.into();
52        self
53    }
54
55    pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
56        self.row_index = row_index.into();
57        self
58    }
59
60    pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
61        self.predicate = predicate.into();
62        self
63    }
64}
65
66impl IpcReaderAsync {
67    pub async fn from_uri(
68        uri: &str,
69        cloud_options: Option<&CloudOptions>,
70    ) -> PolarsResult<IpcReaderAsync> {
71        let cache_entry =
72            init_entries_from_uri_list([Arc::from(uri)].into_iter(), cloud_options)?[0].clone();
73        let (CloudLocation { prefix, .. }, store) =
74            build_object_store(uri, cloud_options, false).await?;
75
76        let path = object_path_from_str(&prefix)?;
77
78        Ok(Self {
79            store,
80            cache_entry,
81            path,
82        })
83    }
84
85    async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
86        self.store.head(&self.path).await
87    }
88
89    async fn file_size(&self) -> PolarsResult<usize> {
90        Ok(self.object_metadata().await?.size as usize)
91    }
92
93    pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
94        let file_size = self.file_size().await?;
95
96        // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
97        let footer_metadata =
98            self.store
99                .get_range(
100                    &self.path,
101                    file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
102                        to_compute_err("ipc file size is smaller than the minimum")
103                    })?..file_size,
104                )
105                .await?;
106
107        let footer_size = deserialize_footer_metadata(
108            footer_metadata
109                .as_ref()
110                .try_into()
111                .map_err(to_compute_err)?,
112        )?;
113
114        let footer = self
115            .store
116            .get_range(
117                &self.path,
118                file_size
119                    .checked_sub(FOOTER_METADATA_SIZE + footer_size)
120                    .ok_or_else(|| {
121                        to_compute_err("invalid ipc footer metadata: footer size too large")
122                    })?..file_size,
123            )
124            .await?;
125
126        arrow::io::ipc::read::deserialize_footer(
127            footer.as_ref(),
128            footer_size.try_into().map_err(to_compute_err)?,
129        )
130    }
131
132    pub async fn data(
133        &self,
134        metadata: Option<&FileMetadata>,
135        options: IpcReadOptions,
136        verbose: bool,
137    ) -> PolarsResult<DataFrame> {
138        // TODO: Only download what is needed rather than the entire file by
139        // making use of the projection, row limit, predicate and such.
140        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
141        let bytes = MMapSemaphore::new_from_file(&file).unwrap();
142
143        let projection = match options.projection.as_deref() {
144            Some(projection) => {
145                fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
146                    if let Some(rc) = row_index {
147                        let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
148                    }
149                    schema
150                }
151
152                // Retrieve the metadata for the schema so we can map column names to indices.
153                let fetched_metadata;
154                let metadata = if let Some(metadata) = metadata {
155                    metadata
156                } else {
157                    // This branch is  happens when _metadata is None, which can happen if we Deserialize the execution plan.
158                    fetched_metadata = self.metadata().await?;
159                    &fetched_metadata
160                };
161
162                let schema = prepare_schema(
163                    Schema::from_arrow_schema(metadata.schema.as_ref()),
164                    options.row_index.as_ref(),
165                );
166
167                let hive_partitions = None;
168
169                materialize_projection(
170                    Some(projection),
171                    &schema,
172                    hive_partitions,
173                    options.row_index.is_some(),
174                )
175            },
176            None => None,
177        };
178
179        let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
180            .with_row_index(options.row_index)
181            .with_n_rows(options.row_limit)
182            .with_projection(projection);
183        reader.finish_with_scan_ops(options.predicate, verbose)
184    }
185
186    pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
187        // TODO: Only download what is needed rather than the entire file by
188        // making use of the projection, row limit, predicate and such.
189        let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
190        let bytes = MMapSemaphore::new_from_file(&file).unwrap();
191        get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
192    }
193}
194
195const FOOTER_METADATA_SIZE: usize = 10;
196
197// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
198// sync and async readers.
199fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
200    let footer_size: usize =
201        i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
202            .try_into()
203            .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
204
205    if &bytes[4..] != b"ARROW1" {
206        polars_bail!(oos = OutOfSpecKind::InvalidFooter);
207    }
208
209    Ok(footer_size)
210}