polars_io/ipc/
ipc_reader_async.rs
1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::pl_str::PlSmallStr;
11
12use crate::RowIndex;
13use crate::cloud::{
14 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
15};
16use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
17use crate::predicates::PhysicalIoExpr;
18use crate::prelude::{IpcReader, materialize_projection};
19use crate::shared::SerReader;
20
21pub struct IpcReaderAsync {
23 store: PolarsObjectStore,
24 cache_entry: Arc<FileCacheEntry>,
25 path: Path,
26}
27
28#[derive(Default, Clone)]
29pub struct IpcReadOptions {
30 projection: Option<Arc<[PlSmallStr]>>,
32
33 row_limit: Option<usize>,
35
36 row_index: Option<RowIndex>,
38
39 predicate: Option<Arc<dyn PhysicalIoExpr>>,
41}
42
43impl IpcReadOptions {
44 pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
45 self.projection = projection;
46 self
47 }
48
49 pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
50 self.row_limit = row_limit.into();
51 self
52 }
53
54 pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
55 self.row_index = row_index.into();
56 self
57 }
58
59 pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
60 self.predicate = predicate.into();
61 self
62 }
63}
64
65impl IpcReaderAsync {
66 pub async fn from_uri(
67 uri: &str,
68 cloud_options: Option<&CloudOptions>,
69 ) -> PolarsResult<IpcReaderAsync> {
70 let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone();
71 let (CloudLocation { prefix, .. }, store) =
72 build_object_store(uri, cloud_options, false).await?;
73
74 let path = object_path_from_str(&prefix)?;
75
76 Ok(Self {
77 store,
78 cache_entry,
79 path,
80 })
81 }
82
83 async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
84 self.store.head(&self.path).await
85 }
86
87 async fn file_size(&self) -> PolarsResult<usize> {
88 Ok(self.object_metadata().await?.size)
89 }
90
91 pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
92 let file_size = self.file_size().await?;
93
94 let footer_metadata =
96 self.store
97 .get_range(
98 &self.path,
99 file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
100 to_compute_err("ipc file size is smaller than the minimum")
101 })?..file_size,
102 )
103 .await?;
104
105 let footer_size = deserialize_footer_metadata(
106 footer_metadata
107 .as_ref()
108 .try_into()
109 .map_err(to_compute_err)?,
110 )?;
111
112 let footer = self
113 .store
114 .get_range(
115 &self.path,
116 file_size
117 .checked_sub(FOOTER_METADATA_SIZE + footer_size)
118 .ok_or_else(|| {
119 to_compute_err("invalid ipc footer metadata: footer size too large")
120 })?..file_size,
121 )
122 .await?;
123
124 arrow::io::ipc::read::deserialize_footer(
125 footer.as_ref(),
126 footer_size.try_into().map_err(to_compute_err)?,
127 )
128 }
129
130 pub async fn data(
131 &self,
132 metadata: Option<&FileMetadata>,
133 options: IpcReadOptions,
134 verbose: bool,
135 ) -> PolarsResult<DataFrame> {
136 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
139 let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
140
141 let projection = match options.projection.as_deref() {
142 Some(projection) => {
143 fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
144 if let Some(rc) = row_index {
145 let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
146 }
147 schema
148 }
149
150 let fetched_metadata;
152 let metadata = if let Some(metadata) = metadata {
153 metadata
154 } else {
155 fetched_metadata = self.metadata().await?;
157 &fetched_metadata
158 };
159
160 let schema = prepare_schema(
161 Schema::from_arrow_schema(metadata.schema.as_ref()),
162 options.row_index.as_ref(),
163 );
164
165 let hive_partitions = None;
166
167 materialize_projection(
168 Some(projection),
169 &schema,
170 hive_partitions,
171 options.row_index.is_some(),
172 )
173 },
174 None => None,
175 };
176
177 let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
178 .with_row_index(options.row_index)
179 .with_n_rows(options.row_limit)
180 .with_projection(projection);
181 reader.finish_with_scan_ops(options.predicate, verbose)
182 }
183
184 pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
185 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
188 let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
189 get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
190 }
191}
192
193const FOOTER_METADATA_SIZE: usize = 10;
194
195fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
198 let footer_size: usize =
199 i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
200 .try_into()
201 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
202
203 if &bytes[4..] != b"ARROW1" {
204 polars_bail!(oos = OutOfSpecKind::InvalidFooter);
205 }
206
207 Ok(footer_size)
208}