polars_io/catalog/unity/
utils.rs

1use bytes::Bytes;
2use polars_error::{PolarsResult, to_compute_err};
3use polars_utils::error::TruncateErrorDetail;
4use reqwest::RequestBuilder;
5
6/// Performs the request and attaches the response body to any error messages.
7pub(super) async fn do_request(request: reqwest::RequestBuilder) -> PolarsResult<bytes::Bytes> {
8    let resp = request.send().await.map_err(to_compute_err)?;
9    let opt_err = resp.error_for_status_ref().map(|_| ());
10    let resp_bytes = resp.bytes().await.map_err(to_compute_err)?;
11
12    opt_err.map_err(|e| {
13        to_compute_err(e).wrap_msg(|e| {
14            let body = String::from_utf8_lossy(&resp_bytes);
15
16            format!(
17                "error: {}, response body: {}",
18                e,
19                TruncateErrorDetail(&body)
20            )
21        })
22    })?;
23
24    Ok(resp_bytes)
25}
26
27/// Support for traversing paginated response values that look like:
28/// ```text
29/// {
30///     $key_name: [$T, $T, ...],
31///     next_page_token: "token" or null,
32/// }
33/// ```
34#[macro_export]
35macro_rules! impl_page_walk {
36    ($S:ty, $T:ty, key_name = $key_name:tt) => {
37        impl $S {
38            pub async fn next(&mut self) -> PolarsResult<Option<Vec<$T>>> {
39                return self
40                    .0
41                    .next(|bytes| {
42                        let Response {
43                            $key_name: out,
44                            next_page_token,
45                        } = decode_json_response(bytes)?;
46
47                        Ok((out, next_page_token))
48                    })
49                    .await;
50
51                #[derive(serde::Deserialize)]
52                struct Response {
53                    #[serde(default = "Vec::new")]
54                    $key_name: Vec<$T>,
55                    #[serde(default)]
56                    next_page_token: Option<String>,
57                }
58            }
59
60            pub async fn read_all_pages(mut self) -> PolarsResult<Vec<$T>> {
61                let Some(mut out) = self.next().await? else {
62                    return Ok(vec![]);
63                };
64
65                while let Some(v) = self.next().await? {
66                    out.extend(v);
67                }
68
69                Ok(out)
70            }
71        }
72    };
73}
74
75pub(crate) struct PageWalker {
76    request: RequestBuilder,
77    next_page_token: Option<String>,
78    has_run: bool,
79}
80
81impl PageWalker {
82    pub(crate) fn new(request: RequestBuilder) -> Self {
83        Self {
84            request,
85            next_page_token: None,
86            has_run: false,
87        }
88    }
89
90    pub(crate) async fn next<F, T>(&mut self, deserializer: F) -> PolarsResult<Option<T>>
91    where
92        F: Fn(&[u8]) -> PolarsResult<(T, Option<String>)>,
93    {
94        let Some(resp_bytes) = self.next_bytes().await? else {
95            return Ok(None);
96        };
97
98        let (value, next_page_token) = deserializer(&resp_bytes)?;
99        self.next_page_token = next_page_token;
100
101        Ok(Some(value))
102    }
103
104    pub(crate) async fn next_bytes(&mut self) -> PolarsResult<Option<Bytes>> {
105        if self.has_run && self.next_page_token.is_none() {
106            return Ok(None);
107        }
108
109        self.has_run = true;
110
111        let request = self.request.try_clone().unwrap();
112
113        let request = if let Some(page_token) = self.next_page_token.take() {
114            request.query(&[("page_token", page_token)])
115        } else {
116            request
117        };
118
119        do_request(request).await.map(Some)
120    }
121}