polars_io/utils/
other.rs

1use std::io::Read;
2#[cfg(target_os = "emscripten")]
3use std::io::{Seek, SeekFrom};
4use std::sync::LazyLock;
5
6use polars_core::prelude::*;
7use polars_utils::mmap::{MMapSemaphore, MemSlice};
8use regex::{Regex, RegexBuilder};
9
10use crate::mmap::{MmapBytesReader, ReaderBytes};
11
12pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
13    reader: &mut R,
14) -> PolarsResult<ReaderBytes<'_>> {
15    // we have a file so we can mmap
16    // only seekable files are mmap-able
17    if let Some((file, offset)) = reader
18        .stream_position()
19        .ok()
20        .and_then(|offset| Some((reader.to_file()?, offset)))
21    {
22        let mut options = memmap::MmapOptions::new();
23        options.offset(offset);
24
25        // Set mmap size based on seek to end when running under Emscripten
26        #[cfg(target_os = "emscripten")]
27        {
28            let mut file = file;
29            let size = file.seek(SeekFrom::End(0)).unwrap();
30            options.len((size - offset) as usize);
31        }
32
33        let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
34        Ok(ReaderBytes::Owned(MemSlice::from_mmap(Arc::new(mmap))))
35    } else {
36        // we can get the bytes for free
37        if reader.to_bytes().is_some() {
38            // duplicate .to_bytes() is necessary to satisfy the borrow checker
39            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
40        } else {
41            // we have to read to an owned buffer to get the bytes.
42            let mut bytes = Vec::with_capacity(1024 * 128);
43            reader.read_to_end(&mut bytes)?;
44            Ok(ReaderBytes::Owned(bytes.into()))
45        }
46    }
47}
48
49#[cfg(any(
50    feature = "ipc",
51    feature = "ipc_streaming",
52    feature = "parquet",
53    feature = "avro"
54))]
55pub fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
56    projection
57        .iter()
58        .map(|idx| schema.get_at_index(*idx).unwrap())
59        .map(|(k, v)| (k.clone(), v.clone()))
60        .collect()
61}
62
63#[cfg(any(
64    feature = "ipc",
65    feature = "ipc_streaming",
66    feature = "avro",
67    feature = "parquet"
68))]
69pub fn columns_to_projection<T: AsRef<str>>(
70    columns: &[T],
71    schema: &ArrowSchema,
72) -> PolarsResult<Vec<usize>> {
73    let mut prj = Vec::with_capacity(columns.len());
74
75    for column in columns {
76        let i = schema.try_index_of(column.as_ref())?;
77        prj.push(i);
78    }
79
80    Ok(prj)
81}
82
83#[cfg(debug_assertions)]
84fn check_offsets(dfs: &[DataFrame]) {
85    dfs.windows(2).for_each(|s| {
86        let a = &s[0].get_columns()[0];
87        let b = &s[1].get_columns()[0];
88
89        let prev = a.get(a.len() - 1).unwrap().extract::<usize>().unwrap();
90        let next = b.get(0).unwrap().extract::<usize>().unwrap();
91        assert_eq!(prev + 1, next);
92    })
93}
94
95/// Because of threading every row starts from `0` or from `offset`.
96/// We must correct that so that they are monotonically increasing.
97#[cfg(any(feature = "csv", feature = "json"))]
98pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
99    if !dfs.is_empty() {
100        let mut previous = offset;
101        for df in &mut *dfs {
102            if df.is_empty() {
103                continue;
104            }
105            let n_read = df.height() as IdxSize;
106            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
107                if let Ok(v) = s.get(0) {
108                    if v.extract::<usize>().unwrap() != previous as usize {
109                        *s = &*s + previous;
110                    }
111                }
112            }
113            previous += n_read;
114        }
115    }
116    #[cfg(debug_assertions)]
117    {
118        check_offsets(dfs)
119    }
120}
121
122/// Because of threading every row starts from `0` or from `offset`.
123/// We must correct that so that they are monotonically increasing.
124#[cfg(feature = "json")]
125pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], offset: IdxSize) {
126    assert_eq!(dfs.len(), heights.len());
127    if !dfs.is_empty() {
128        let mut previous = offset;
129        for i in 0..dfs.len() {
130            let df = &mut dfs[i];
131            if df.is_empty() {
132                continue;
133            }
134
135            if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
136                if let Ok(v) = s.get(0) {
137                    if v.extract::<usize>().unwrap() != previous as usize {
138                        *s = &*s + previous;
139                    }
140                }
141            }
142            let n_read = heights[i];
143            previous += n_read;
144        }
145    }
146}
147
148#[cfg(feature = "json")]
149pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> {
150    for (k, value) in overwriting_schema.iter() {
151        *schema.try_get_mut(k)? = value.clone();
152    }
153    Ok(())
154}
155
156pub static FLOAT_RE: LazyLock<Regex> = LazyLock::new(|| {
157    Regex::new(r"^[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$").unwrap()
158});
159
160pub static FLOAT_RE_DECIMAL: LazyLock<Regex> = LazyLock::new(|| {
161    Regex::new(r"^[-+]?((\d*,\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+,)$").unwrap()
162});
163
164pub static INTEGER_RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?(\d+)$").unwrap());
165
166pub static BOOLEAN_RE: LazyLock<Regex> = LazyLock::new(|| {
167    RegexBuilder::new(r"^(true|false)$")
168        .case_insensitive(true)
169        .build()
170        .unwrap()
171});
172
173pub fn materialize_projection(
174    with_columns: Option<&[PlSmallStr]>,
175    schema: &Schema,
176    hive_partitions: Option<&[Series]>,
177    has_row_index: bool,
178) -> Option<Vec<usize>> {
179    match hive_partitions {
180        None => with_columns.map(|with_columns| {
181            with_columns
182                .iter()
183                .map(|name| schema.index_of(name).unwrap() - has_row_index as usize)
184                .collect()
185        }),
186        Some(part_cols) => {
187            with_columns.map(|with_columns| {
188                with_columns
189                    .iter()
190                    .flat_map(|name| {
191                        // the hive partitions are added at the end of the schema, but we don't want to project
192                        // them from the file
193                        if part_cols.iter().any(|s| s.name() == name.as_str()) {
194                            None
195                        } else {
196                            Some(schema.index_of(name).unwrap() - has_row_index as usize)
197                        }
198                    })
199                    .collect()
200            })
201        },
202    }
203}
204
205/// Utility for decoding JSON that adds the response value to the error message if decoding fails.
206/// This makes it much easier to debug errors from parsing network responses.
207#[cfg(feature = "cloud")]
208pub fn decode_json_response<T>(bytes: &[u8]) -> PolarsResult<T>
209where
210    T: for<'de> serde::de::Deserialize<'de>,
211{
212    use polars_error::to_compute_err;
213    use polars_utils::error::TruncateErrorDetail;
214
215    serde_json::from_slice(bytes)
216        .map_err(to_compute_err)
217        .map_err(|e| {
218            e.wrap_msg(|e| {
219                format!(
220                    "error decoding response: {}, response value: {}",
221                    e,
222                    TruncateErrorDetail(&String::from_utf8_lossy(bytes))
223                )
224            })
225        })
226}
227
228#[cfg(test)]
229mod tests {
230    use super::FLOAT_RE;
231
232    #[test]
233    fn test_float_parse() {
234        assert!(FLOAT_RE.is_match("0.1"));
235        assert!(FLOAT_RE.is_match("3.0"));
236        assert!(FLOAT_RE.is_match("3.00001"));
237        assert!(FLOAT_RE.is_match("-9.9990e-003"));
238        assert!(FLOAT_RE.is_match("9.9990e+003"));
239        assert!(FLOAT_RE.is_match("9.9990E+003"));
240        assert!(FLOAT_RE.is_match("9.9990E+003"));
241        assert!(FLOAT_RE.is_match(".5"));
242        assert!(FLOAT_RE.is_match("2.5E-10"));
243        assert!(FLOAT_RE.is_match("2.5e10"));
244        assert!(FLOAT_RE.is_match("NaN"));
245        assert!(FLOAT_RE.is_match("-NaN"));
246        assert!(FLOAT_RE.is_match("-inf"));
247        assert!(FLOAT_RE.is_match("inf"));
248        assert!(FLOAT_RE.is_match("-7e-05"));
249        assert!(FLOAT_RE.is_match("7e-05"));
250        assert!(FLOAT_RE.is_match("+7e+05"));
251    }
252}