1use arrow::array::BinaryViewArrayGenericBuilder;
2use arrow::datatypes::ArrowDataType;
3use polars_core::prelude::DataType;
4use polars_core::series::Series;
5use polars_error::{PolarsResult, polars_bail};
6use polars_utils::pl_str::PlSmallStr;
7
8const EOL_CHAR: u8 = b'\n';
9
10pub fn count_lines(full_bytes: &[u8]) -> usize {
11 let mut n: usize = full_bytes.iter().map(|c| (*c == EOL_CHAR) as usize).sum();
12
13 if let Some(c) = full_bytes.last()
14 && *c != EOL_CHAR
15 {
16 n += 1;
17 }
18
19 n
20}
21
22pub fn split_lines_to_rows(bytes: &[u8]) -> PolarsResult<Series> {
23 split_lines_to_rows_impl(bytes, u32::MAX as usize)
24}
25
26fn split_lines_to_rows_impl(bytes: &[u8], max_buffer_size: usize) -> PolarsResult<Series> {
27 if bytes.is_empty() {
28 return Ok(Series::new_empty(PlSmallStr::EMPTY, &DataType::String));
29 };
30
31 let first_line_len = bytes.split(|c| *c == EOL_CHAR).next().unwrap().len();
32 let last_line_len = bytes.rsplit(|c| *c == EOL_CHAR).next().unwrap().len();
33
34 let n_lines_estimate = bytes
35 .len()
36 .div_ceil(first_line_len.min(last_line_len).max(1));
37
38 use arrow::array::builder::StaticArrayBuilder;
39
40 let mut builder: BinaryViewArrayGenericBuilder<[u8]> =
41 BinaryViewArrayGenericBuilder::new(ArrowDataType::BinaryView);
42 builder.reserve(n_lines_estimate);
43
44 for line_bytes in bytes
45 .strip_suffix(&[EOL_CHAR])
46 .unwrap_or(bytes)
47 .split(|c| *c == EOL_CHAR)
48 {
49 if line_bytes.len() > max_buffer_size {
50 polars_bail!(
51 ComputeError:
52 "line byte length {} exceeds max buffer size {}",
53 line_bytes.len(), max_buffer_size,
54 )
55 }
56
57 builder.push_value_ignore_validity(line_bytes);
58 }
59
60 let arr = builder.freeze();
61
62 let arr = arr.to_utf8view()?;
64
65 Ok(unsafe {
66 Series::_try_from_arrow_unchecked(
67 PlSmallStr::EMPTY,
68 vec![arr.boxed()],
69 &ArrowDataType::Utf8View,
70 )?
71 })
72}
73
74#[cfg(test)]
75mod tests {
76 use arrow::buffer::Buffer;
77 use polars_error::PolarsError;
78
79 use crate::scan_lines::split_lines_to_rows_impl;
80
81 #[test]
82 fn test_split_lines_to_rows_impl() {
83 let data: &'static [u8] = b"
84AAAAABBBBBCCCCCDDDDD
85
86EEEEEFFFFFGGGGGHHHHH
87
88";
89
90 let out = split_lines_to_rows_impl(data, 20).unwrap();
91 let out = out.str().unwrap();
92
93 assert_eq!(
94 out.iter().collect::<Vec<_>>().as_slice(),
95 &[
96 Some(""),
97 Some("AAAAABBBBBCCCCCDDDDD"),
98 Some(""),
99 Some("EEEEEFFFFFGGGGGHHHHH"),
100 Some(""),
101 ]
102 );
103
104 let v: Vec<&[Buffer<u8>]> = out
105 .downcast_iter()
106 .map(|array| array.data_buffers().as_ref())
107 .collect();
108
109 assert_eq!(
110 v.as_slice(),
111 &[&[Buffer::from_static(
112 b"AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH"
113 )]]
114 );
115
116 let PolarsError::ComputeError(err_str) = split_lines_to_rows_impl(data, 19).unwrap_err()
117 else {
118 unreachable!()
119 };
120
121 assert_eq!(&*err_str, "line byte length 20 exceeds max buffer size 19");
122 }
123
124 #[test]
125 fn test_split_lines_to_rows_impl_all_inline() {
126 let data: Vec<u8> = [
127 b"AAAABBBBCCCC\n".as_slice(),
128 b" \n".as_slice(),
129 b"DDDDEEEEFFFF\n".as_slice(),
130 b" ".as_slice(),
131 ]
132 .concat();
133
134 let out = split_lines_to_rows_impl(&data, 12).unwrap();
135 let out = out.str().unwrap();
136
137 assert_eq!(
138 out.iter().collect::<Vec<_>>().as_slice(),
139 &[
140 Some("AAAABBBBCCCC"),
141 Some(" "),
142 Some("DDDDEEEEFFFF"),
143 Some(" "),
144 ]
145 );
146
147 let v: Vec<&[Buffer<u8>]> = out
148 .downcast_iter()
149 .map(|array| array.data_buffers().as_ref())
150 .collect();
151
152 assert_eq!(v.as_slice(), &[&[][..]]);
153 }
154}