polars_io/
scan_lines.rs

1use arrow::array::BinaryViewArrayGenericBuilder;
2use arrow::datatypes::ArrowDataType;
3use polars_core::prelude::DataType;
4use polars_core::series::Series;
5use polars_error::{PolarsResult, polars_bail};
6use polars_utils::pl_str::PlSmallStr;
7
8const EOL_CHAR: u8 = b'\n';
9
10pub fn count_lines(full_bytes: &[u8]) -> usize {
11    let mut n: usize = full_bytes.iter().map(|c| (*c == EOL_CHAR) as usize).sum();
12
13    if let Some(c) = full_bytes.last()
14        && *c != EOL_CHAR
15    {
16        n += 1;
17    }
18
19    n
20}
21
22pub fn split_lines_to_rows(bytes: &[u8]) -> PolarsResult<Series> {
23    split_lines_to_rows_impl(bytes, u32::MAX as usize)
24}
25
26fn split_lines_to_rows_impl(bytes: &[u8], max_buffer_size: usize) -> PolarsResult<Series> {
27    if bytes.is_empty() {
28        return Ok(Series::new_empty(PlSmallStr::EMPTY, &DataType::String));
29    };
30
31    let first_line_len = bytes.split(|c| *c == EOL_CHAR).next().unwrap().len();
32    let last_line_len = bytes.rsplit(|c| *c == EOL_CHAR).next().unwrap().len();
33
34    let n_lines_estimate = bytes
35        .len()
36        .div_ceil(first_line_len.min(last_line_len).max(1));
37
38    use arrow::array::builder::StaticArrayBuilder;
39
40    let mut builder: BinaryViewArrayGenericBuilder<[u8]> =
41        BinaryViewArrayGenericBuilder::new(ArrowDataType::BinaryView);
42    builder.reserve(n_lines_estimate);
43
44    for line_bytes in bytes
45        .strip_suffix(&[EOL_CHAR])
46        .unwrap_or(bytes)
47        .split(|c| *c == EOL_CHAR)
48    {
49        if line_bytes.len() > max_buffer_size {
50            polars_bail!(
51                ComputeError:
52                "line byte length {} exceeds max buffer size {}",
53                line_bytes.len(), max_buffer_size,
54            )
55        }
56
57        builder.push_value_ignore_validity(line_bytes);
58    }
59
60    let arr = builder.freeze();
61
62    // Performs UTF-8 validation.
63    let arr = arr.to_utf8view()?;
64
65    Ok(unsafe {
66        Series::_try_from_arrow_unchecked(
67            PlSmallStr::EMPTY,
68            vec![arr.boxed()],
69            &ArrowDataType::Utf8View,
70        )?
71    })
72}
73
74#[cfg(test)]
75mod tests {
76    use arrow::buffer::Buffer;
77    use polars_error::PolarsError;
78
79    use crate::scan_lines::split_lines_to_rows_impl;
80
81    #[test]
82    fn test_split_lines_to_rows_impl() {
83        let data: &'static [u8] = b"
84AAAAABBBBBCCCCCDDDDD
85
86EEEEEFFFFFGGGGGHHHHH
87
88";
89
90        let out = split_lines_to_rows_impl(data, 20).unwrap();
91        let out = out.str().unwrap();
92
93        assert_eq!(
94            out.iter().collect::<Vec<_>>().as_slice(),
95            &[
96                Some(""),
97                Some("AAAAABBBBBCCCCCDDDDD"),
98                Some(""),
99                Some("EEEEEFFFFFGGGGGHHHHH"),
100                Some(""),
101            ]
102        );
103
104        let v: Vec<&[Buffer<u8>]> = out
105            .downcast_iter()
106            .map(|array| array.data_buffers().as_ref())
107            .collect();
108
109        assert_eq!(
110            v.as_slice(),
111            &[&[Buffer::from_static(
112                b"AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH"
113            )]]
114        );
115
116        let PolarsError::ComputeError(err_str) = split_lines_to_rows_impl(data, 19).unwrap_err()
117        else {
118            unreachable!()
119        };
120
121        assert_eq!(&*err_str, "line byte length 20 exceeds max buffer size 19");
122    }
123
124    #[test]
125    fn test_split_lines_to_rows_impl_all_inline() {
126        let data: Vec<u8> = [
127            b"AAAABBBBCCCC\n".as_slice(),
128            b"            \n".as_slice(),
129            b"DDDDEEEEFFFF\n".as_slice(),
130            b"            ".as_slice(),
131        ]
132        .concat();
133
134        let out = split_lines_to_rows_impl(&data, 12).unwrap();
135        let out = out.str().unwrap();
136
137        assert_eq!(
138            out.iter().collect::<Vec<_>>().as_slice(),
139            &[
140                Some("AAAABBBBCCCC"),
141                Some("            "),
142                Some("DDDDEEEEFFFF"),
143                Some("            "),
144            ]
145        );
146
147        let v: Vec<&[Buffer<u8>]> = out
148            .downcast_iter()
149            .map(|array| array.data_buffers().as_ref())
150            .collect();
151
152        assert_eq!(v.as_slice(), &[&[][..]]);
153    }
154}