polars_io/
scan_lines.rs

1use arrow::array::BinaryViewArrayGenericBuilder;
2use arrow::array::builder::StaticArrayBuilder;
3use arrow::datatypes::ArrowDataType;
4use polars_core::prelude::DataType;
5use polars_core::series::Series;
6use polars_error::{PolarsResult, polars_ensure};
7use polars_utils::pl_str::PlSmallStr;
8
9type BinviewArrayBuilder = BinaryViewArrayGenericBuilder<[u8]>;
10
11const CR: u8 = b'\r';
12const LF: u8 = b'\n';
13
14pub fn count_lines(full_bytes: &[u8]) -> usize {
15    let mut n: usize = full_bytes.iter().map(|c| (*c == LF) as usize).sum();
16
17    if let Some(c) = full_bytes.last()
18        && *c != LF
19    {
20        n += 1;
21    }
22
23    n
24}
25
26pub fn split_lines_to_rows(bytes: &[u8]) -> PolarsResult<Series> {
27    split_lines_to_rows_impl(bytes, BinviewArrayBuilder::MAX_ROW_BYTE_LEN)
28}
29
30fn split_lines_to_rows_impl(bytes: &[u8], max_buffer_size: usize) -> PolarsResult<Series> {
31    if bytes.is_empty() {
32        return Ok(Series::new_empty(PlSmallStr::EMPTY, &DataType::String));
33    };
34
35    let first_line_len = bytes.split(|c| *c == LF).next().unwrap().len();
36    let last_line_len = bytes.rsplit(|c| *c == LF).next().unwrap().len();
37
38    let n_lines_estimate = bytes
39        .len()
40        .div_ceil(first_line_len.min(last_line_len).max(1));
41
42    let mut builder: BinviewArrayBuilder = BinviewArrayBuilder::new(ArrowDataType::BinaryView);
43    builder.reserve(n_lines_estimate);
44
45    let bytes = if bytes.last() == Some(&LF) {
46        &bytes[..bytes.len() - 1]
47    } else {
48        bytes
49    };
50
51    for line_bytes in bytes.split(|c| *c == LF) {
52        let line_bytes = if line_bytes.last() == Some(&CR) {
53            &line_bytes[..line_bytes.len() - 1]
54        } else {
55            line_bytes
56        };
57
58        polars_ensure!(
59            line_bytes.len() <= max_buffer_size,
60            ComputeError:
61            "line byte length {} exceeds max row byte length {}",
62            line_bytes.len(), max_buffer_size,
63        );
64
65        builder.push_value_ignore_validity(line_bytes);
66    }
67
68    let arr = builder.freeze();
69
70    // Performs UTF-8 validation.
71    let arr = arr.to_utf8view()?;
72
73    Ok(unsafe {
74        Series::_try_from_arrow_unchecked(
75            PlSmallStr::EMPTY,
76            vec![arr.boxed()],
77            &ArrowDataType::Utf8View,
78        )?
79    })
80}
81
82#[cfg(test)]
83mod tests {
84    use polars_buffer::Buffer;
85    use polars_error::PolarsError;
86
87    use crate::scan_lines::split_lines_to_rows_impl;
88
89    #[test]
90    fn test_split_lines_to_rows_impl() {
91        let data: &'static [u8] = b"
92AAAAABBBBBCCCCCDDDDD
93
94EEEEEFFFFFGGGGGHHHHH
95
96";
97
98        let out = split_lines_to_rows_impl(data, 20).unwrap();
99        let out = out.str().unwrap();
100
101        assert_eq!(
102            out.iter().collect::<Vec<_>>().as_slice(),
103            &[
104                Some(""),
105                Some("AAAAABBBBBCCCCCDDDDD"),
106                Some(""),
107                Some("EEEEEFFFFFGGGGGHHHHH"),
108                Some(""),
109            ]
110        );
111
112        let v: Vec<&[Buffer<u8>]> = out
113            .downcast_iter()
114            .map(|array| array.data_buffers().as_ref())
115            .collect();
116
117        assert_eq!(
118            v.as_slice(),
119            &[&[Buffer::from_static(
120                b"AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH"
121            )]]
122        );
123
124        let PolarsError::ComputeError(err_str) = split_lines_to_rows_impl(data, 19).unwrap_err()
125        else {
126            unreachable!()
127        };
128
129        assert_eq!(
130            &*err_str,
131            "line byte length 20 exceeds max row byte length 19"
132        );
133    }
134
135    #[test]
136    fn test_split_lines_to_rows_impl_all_inline() {
137        let data: Vec<u8> = [
138            b"AAAABBBBCCCC\n".as_slice(),
139            b"            \n".as_slice(),
140            b"DDDDEEEEFFFF\n".as_slice(),
141            b"            ".as_slice(),
142        ]
143        .concat();
144
145        let out = split_lines_to_rows_impl(&data, 12).unwrap();
146        let out = out.str().unwrap();
147
148        assert_eq!(
149            out.iter().collect::<Vec<_>>().as_slice(),
150            &[
151                Some("AAAABBBBCCCC"),
152                Some("            "),
153                Some("DDDDEEEEFFFF"),
154                Some("            "),
155            ]
156        );
157
158        let v: Vec<&[Buffer<u8>]> = out
159            .downcast_iter()
160            .map(|array| array.data_buffers().as_ref())
161            .collect();
162
163        assert_eq!(v.as_slice(), &[&[][..]]);
164    }
165}