polars_io/utils/
other.rs

1use std::io::Read;
2#[cfg(target_os = "emscripten")]
3use std::io::{Seek, SeekFrom};
4
5use polars_core::prelude::*;
6use polars_utils::mmap::{MMapSemaphore, MemSlice};
7
8use crate::mmap::{MmapBytesReader, ReaderBytes};
9
10pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
11    reader: &mut R,
12) -> PolarsResult<ReaderBytes<'_>> {
13    // we have a file so we can mmap
14    // only seekable files are mmap-able
15    if let Some((file, offset)) = reader
16        .stream_position()
17        .ok()
18        .and_then(|offset| Some((reader.to_file()?, offset)))
19    {
20        let mut options = memmap::MmapOptions::new();
21        options.offset(offset);
22
23        // Set mmap size based on seek to end when running under Emscripten
24        #[cfg(target_os = "emscripten")]
25        {
26            let mut file = file;
27            let size = file.seek(SeekFrom::End(0)).unwrap();
28            options.len((size - offset) as usize);
29        }
30
31        let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
32        Ok(ReaderBytes::Owned(MemSlice::from_mmap(Arc::new(mmap))))
33    } else {
34        // we can get the bytes for free
35        if reader.to_bytes().is_some() {
36            // duplicate .to_bytes() is necessary to satisfy the borrow checker
37            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
38        } else {
39            // we have to read to an owned buffer to get the bytes.
40            let mut bytes = Vec::with_capacity(1024 * 128);
41            reader.read_to_end(&mut bytes)?;
42            Ok(ReaderBytes::Owned(bytes.into()))
43        }
44    }
45}
46
47#[cfg(any(
48    feature = "ipc",
49    feature = "ipc_streaming",
50    feature = "parquet",
51    feature = "avro"
52))]
53pub fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
54    projection
55        .iter()
56        .map(|idx| schema.get_at_index(*idx).unwrap())
57        .map(|(k, v)| (k.clone(), v.clone()))
58        .collect()
59}
60
61#[cfg(any(
62    feature = "ipc",
63    feature = "ipc_streaming",
64    feature = "avro",
65    feature = "parquet"
66))]
67pub fn columns_to_projection<T: AsRef<str>>(
68    columns: &[T],
69    schema: &ArrowSchema,
70) -> PolarsResult<Vec<usize>> {
71    let mut prj = Vec::with_capacity(columns.len());
72
73    for column in columns {
74        let i = schema.try_index_of(column.as_ref())?;
75        prj.push(i);
76    }
77
78    Ok(prj)
79}
80
81#[cfg(debug_assertions)]
82fn check_offsets(dfs: &[DataFrame]) {
83    dfs.windows(2).for_each(|s| {
84        let a = &s[0].get_columns()[0];
85        let b = &s[1].get_columns()[0];
86
87        let prev = a.get(a.len() - 1).unwrap().extract::<usize>().unwrap();
88        let next = b.get(0).unwrap().extract::<usize>().unwrap();
89        assert_eq!(prev + 1, next);
90    })
91}
92
93/// Because of threading every row starts from `0` or from `offset`.
94/// We must correct that so that they are monotonically increasing.
95#[cfg(any(feature = "csv", feature = "json"))]
96pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
97    if !dfs.is_empty() {
98        let mut previous = offset;
99        for df in &mut *dfs {
100            if df.is_empty() {
101                continue;
102            }
103            let n_read = df.height() as IdxSize;
104            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
105                if let Ok(v) = s.get(0) {
106                    if v.extract::<usize>().unwrap() != previous as usize {
107                        *s = &*s + previous;
108                    }
109                }
110            }
111            previous += n_read;
112        }
113    }
114    #[cfg(debug_assertions)]
115    {
116        check_offsets(dfs)
117    }
118}
119
120/// Because of threading every row starts from `0` or from `offset`.
121/// We must correct that so that they are monotonically increasing.
122#[cfg(feature = "json")]
123pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], offset: IdxSize) {
124    assert_eq!(dfs.len(), heights.len());
125    if !dfs.is_empty() {
126        let mut previous = offset;
127        for i in 0..dfs.len() {
128            let df = &mut dfs[i];
129            if df.is_empty() {
130                continue;
131            }
132
133            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
134                if let Ok(v) = s.get(0) {
135                    if v.extract::<usize>().unwrap() != previous as usize {
136                        *s = &*s + previous;
137                    }
138                }
139            }
140            let n_read = heights[i];
141            previous += n_read;
142        }
143    }
144}
145
146#[cfg(feature = "json")]
147pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> {
148    for (k, value) in overwriting_schema.iter() {
149        *schema.try_get_mut(k)? = value.clone();
150    }
151    Ok(())
152}
153
154polars_utils::regex_cache::cached_regex! {
155    pub static FLOAT_RE = r"^[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$";
156    pub static FLOAT_RE_DECIMAL = r"^[-+]?((\d*,\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+,)$";
157    pub static INTEGER_RE = r"^-?(\d+)$";
158    pub static BOOLEAN_RE = r"^(?i:true|false)$";
159}
160
161pub fn materialize_projection(
162    with_columns: Option<&[PlSmallStr]>,
163    schema: &Schema,
164    hive_partitions: Option<&[Series]>,
165    has_row_index: bool,
166) -> Option<Vec<usize>> {
167    match hive_partitions {
168        None => with_columns.map(|with_columns| {
169            with_columns
170                .iter()
171                .map(|name| schema.index_of(name).unwrap() - has_row_index as usize)
172                .collect()
173        }),
174        Some(part_cols) => {
175            with_columns.map(|with_columns| {
176                with_columns
177                    .iter()
178                    .flat_map(|name| {
179                        // the hive partitions are added at the end of the schema, but we don't want to project
180                        // them from the file
181                        if part_cols.iter().any(|s| s.name() == name.as_str()) {
182                            None
183                        } else {
184                            Some(schema.index_of(name).unwrap() - has_row_index as usize)
185                        }
186                    })
187                    .collect()
188            })
189        },
190    }
191}
192
193/// Utility for decoding JSON that adds the response value to the error message if decoding fails.
194/// This makes it much easier to debug errors from parsing network responses.
195#[cfg(feature = "cloud")]
196pub fn decode_json_response<T>(bytes: &[u8]) -> PolarsResult<T>
197where
198    T: for<'de> serde::de::Deserialize<'de>,
199{
200    use polars_error::to_compute_err;
201    use polars_utils::error::TruncateErrorDetail;
202
203    serde_json::from_slice(bytes)
204        .map_err(to_compute_err)
205        .map_err(|e| {
206            e.wrap_msg(|e| {
207                format!(
208                    "error decoding response: {}, response value: {}",
209                    e,
210                    TruncateErrorDetail(&String::from_utf8_lossy(bytes))
211                )
212            })
213        })
214}
215
216#[cfg(test)]
217mod tests {
218    use super::FLOAT_RE;
219
220    #[test]
221    fn test_float_parse() {
222        assert!(FLOAT_RE.is_match("0.1"));
223        assert!(FLOAT_RE.is_match("3.0"));
224        assert!(FLOAT_RE.is_match("3.00001"));
225        assert!(FLOAT_RE.is_match("-9.9990e-003"));
226        assert!(FLOAT_RE.is_match("9.9990e+003"));
227        assert!(FLOAT_RE.is_match("9.9990E+003"));
228        assert!(FLOAT_RE.is_match("9.9990E+003"));
229        assert!(FLOAT_RE.is_match(".5"));
230        assert!(FLOAT_RE.is_match("2.5E-10"));
231        assert!(FLOAT_RE.is_match("2.5e10"));
232        assert!(FLOAT_RE.is_match("NaN"));
233        assert!(FLOAT_RE.is_match("-NaN"));
234        assert!(FLOAT_RE.is_match("-inf"));
235        assert!(FLOAT_RE.is_match("inf"));
236        assert!(FLOAT_RE.is_match("-7e-05"));
237        assert!(FLOAT_RE.is_match("7e-05"));
238        assert!(FLOAT_RE.is_match("+7e+05"));
239    }
240}