polars_io/ipc/
ipc_reader_async.rs1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_str::PlSmallStr;
12
13use crate::RowIndex;
14use crate::cloud::{
15 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
18use crate::predicates::PhysicalIoExpr;
19use crate::prelude::{IpcReader, materialize_projection};
20use crate::shared::SerReader;
21
22pub struct IpcReaderAsync {
24 store: PolarsObjectStore,
25 cache_entry: Arc<FileCacheEntry>,
26 path: Path,
27}
28
29#[derive(Default, Clone)]
30pub struct IpcReadOptions {
31 projection: Option<Arc<[PlSmallStr]>>,
33
34 row_limit: Option<usize>,
36
37 row_index: Option<RowIndex>,
39
40 predicate: Option<Arc<dyn PhysicalIoExpr>>,
42}
43
44impl IpcReadOptions {
45 pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
46 self.projection = projection;
47 self
48 }
49
50 pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
51 self.row_limit = row_limit.into();
52 self
53 }
54
55 pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
56 self.row_index = row_index.into();
57 self
58 }
59
60 pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
61 self.predicate = predicate.into();
62 self
63 }
64}
65
66impl IpcReaderAsync {
67 pub async fn from_uri(
68 uri: &str,
69 cloud_options: Option<&CloudOptions>,
70 ) -> PolarsResult<IpcReaderAsync> {
71 let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone();
72 let (CloudLocation { prefix, .. }, store) =
73 build_object_store(uri, cloud_options, false).await?;
74
75 let path = object_path_from_str(&prefix)?;
76
77 Ok(Self {
78 store,
79 cache_entry,
80 path,
81 })
82 }
83
84 async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
85 self.store.head(&self.path).await
86 }
87
88 async fn file_size(&self) -> PolarsResult<usize> {
89 Ok(self.object_metadata().await?.size as usize)
90 }
91
92 pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
93 let file_size = self.file_size().await?;
94
95 let footer_metadata =
97 self.store
98 .get_range(
99 &self.path,
100 file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
101 to_compute_err("ipc file size is smaller than the minimum")
102 })?..file_size,
103 )
104 .await?;
105
106 let footer_size = deserialize_footer_metadata(
107 footer_metadata
108 .as_ref()
109 .try_into()
110 .map_err(to_compute_err)?,
111 )?;
112
113 let footer = self
114 .store
115 .get_range(
116 &self.path,
117 file_size
118 .checked_sub(FOOTER_METADATA_SIZE + footer_size)
119 .ok_or_else(|| {
120 to_compute_err("invalid ipc footer metadata: footer size too large")
121 })?..file_size,
122 )
123 .await?;
124
125 arrow::io::ipc::read::deserialize_footer(
126 footer.as_ref(),
127 footer_size.try_into().map_err(to_compute_err)?,
128 )
129 }
130
131 pub async fn data(
132 &self,
133 metadata: Option<&FileMetadata>,
134 options: IpcReadOptions,
135 verbose: bool,
136 ) -> PolarsResult<DataFrame> {
137 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
140 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
141
142 let projection = match options.projection.as_deref() {
143 Some(projection) => {
144 fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
145 if let Some(rc) = row_index {
146 let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
147 }
148 schema
149 }
150
151 let fetched_metadata;
153 let metadata = if let Some(metadata) = metadata {
154 metadata
155 } else {
156 fetched_metadata = self.metadata().await?;
158 &fetched_metadata
159 };
160
161 let schema = prepare_schema(
162 Schema::from_arrow_schema(metadata.schema.as_ref()),
163 options.row_index.as_ref(),
164 );
165
166 let hive_partitions = None;
167
168 materialize_projection(
169 Some(projection),
170 &schema,
171 hive_partitions,
172 options.row_index.is_some(),
173 )
174 },
175 None => None,
176 };
177
178 let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
179 .with_row_index(options.row_index)
180 .with_n_rows(options.row_limit)
181 .with_projection(projection);
182 reader.finish_with_scan_ops(options.predicate, verbose)
183 }
184
185 pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
186 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
189 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
190 get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
191 }
192}
193
194const FOOTER_METADATA_SIZE: usize = 10;
195
196fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
199 let footer_size: usize =
200 i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
201 .try_into()
202 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
203
204 if &bytes[4..] != b"ARROW1" {
205 polars_bail!(oos = OutOfSpecKind::InvalidFooter);
206 }
207
208 Ok(footer_size)
209}