polars_io/catalog/unity/
utils.rs1use bytes::Bytes;
2use polars_error::{PolarsResult, to_compute_err};
3use polars_utils::error::TruncateErrorDetail;
4use reqwest::RequestBuilder;
5
6pub(super) async fn do_request(request: reqwest::RequestBuilder) -> PolarsResult<bytes::Bytes> {
8 let resp = request.send().await.map_err(to_compute_err)?;
9 let opt_err = resp.error_for_status_ref().map(|_| ());
10 let resp_bytes = resp.bytes().await.map_err(to_compute_err)?;
11
12 opt_err.map_err(|e| {
13 to_compute_err(e).wrap_msg(|e| {
14 let body = String::from_utf8_lossy(&resp_bytes);
15
16 format!(
17 "error: {}, response body: {}",
18 e,
19 TruncateErrorDetail(&body)
20 )
21 })
22 })?;
23
24 Ok(resp_bytes)
25}
26
27#[macro_export]
35macro_rules! impl_page_walk {
36 ($S:ty, $T:ty, key_name = $key_name:tt) => {
37 impl $S {
38 pub async fn next(&mut self) -> PolarsResult<Option<Vec<$T>>> {
39 return self
40 .0
41 .next(|bytes| {
42 let Response {
43 $key_name: out,
44 next_page_token,
45 } = decode_json_response(bytes)?;
46
47 Ok((out, next_page_token))
48 })
49 .await;
50
51 #[derive(serde::Deserialize)]
52 struct Response {
53 #[serde(default = "Vec::new")]
54 $key_name: Vec<$T>,
55 #[serde(default)]
56 next_page_token: Option<String>,
57 }
58 }
59
60 pub async fn read_all_pages(mut self) -> PolarsResult<Vec<$T>> {
61 let Some(mut out) = self.next().await? else {
62 return Ok(vec![]);
63 };
64
65 while let Some(v) = self.next().await? {
66 out.extend(v);
67 }
68
69 Ok(out)
70 }
71 }
72 };
73}
74
75pub(crate) struct PageWalker {
76 request: RequestBuilder,
77 next_page_token: Option<String>,
78 has_run: bool,
79}
80
81impl PageWalker {
82 pub(crate) fn new(request: RequestBuilder) -> Self {
83 Self {
84 request,
85 next_page_token: None,
86 has_run: false,
87 }
88 }
89
90 pub(crate) async fn next<F, T>(&mut self, deserializer: F) -> PolarsResult<Option<T>>
91 where
92 F: Fn(&[u8]) -> PolarsResult<(T, Option<String>)>,
93 {
94 let Some(resp_bytes) = self.next_bytes().await? else {
95 return Ok(None);
96 };
97
98 let (value, next_page_token) = deserializer(&resp_bytes)?;
99 self.next_page_token = next_page_token;
100
101 Ok(Some(value))
102 }
103
104 pub(crate) async fn next_bytes(&mut self) -> PolarsResult<Option<Bytes>> {
105 if self.has_run && self.next_page_token.is_none() {
106 return Ok(None);
107 }
108
109 self.has_run = true;
110
111 let request = self.request.try_clone().unwrap();
112
113 let request = if let Some(page_token) = self.next_page_token.take() {
114 request.query(&[("page_token", page_token)])
115 } else {
116 request
117 };
118
119 do_request(request).await.map(Some)
120 }
121}