polars_io/catalog/unity/
client.rs

1use polars_core::prelude::PlHashMap;
2use polars_core::schema::Schema;
3use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6use super::schema::schema_to_column_info_list;
7use super::utils::{PageWalker, do_request};
8use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9use crate::impl_page_walk;
10use crate::utils::decode_json_response;
11
12/// Unity catalog client.
13pub struct CatalogClient {
14    workspace_url: String,
15    http_client: reqwest::Client,
16}
17
18impl CatalogClient {
19    pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
20        ListCatalogs(PageWalker::new(self.http_client.get(format!(
21            "{}{}",
22            &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
23        ))))
24        .read_all_pages()
25        .await
26    }
27
28    pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
29        ListSchemas(PageWalker::new(
30            self.http_client
31                .get(format!(
32                    "{}{}",
33                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
34                ))
35                .query(&[("catalog_name", catalog_name)]),
36        ))
37        .read_all_pages()
38        .await
39    }
40
41    pub async fn list_tables(
42        &self,
43        catalog_name: &str,
44        namespace: &str,
45    ) -> PolarsResult<Vec<TableInfo>> {
46        ListTables(PageWalker::new(
47            self.http_client
48                .get(format!(
49                    "{}{}",
50                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
51                ))
52                .query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
53        ))
54        .read_all_pages()
55        .await
56    }
57
58    pub async fn get_table_info(
59        &self,
60        catalog_name: &str,
61        namespace: &str,
62        table_name: &str,
63    ) -> PolarsResult<TableInfo> {
64        let full_table_name = format!(
65            "{}.{}.{}",
66            catalog_name.replace('/', "%2F"),
67            namespace.replace('/', "%2F"),
68            table_name.replace('/', "%2F")
69        );
70
71        let bytes = do_request(
72            self.http_client
73                .get(format!(
74                    "{}{}{}",
75                    &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
76                ))
77                .query(&[("full_name", full_table_name)]),
78        )
79        .await?;
80
81        let out: TableInfo = decode_json_response(&bytes)?;
82
83        Ok(out)
84    }
85
86    pub async fn get_table_credentials(
87        &self,
88        table_id: &str,
89        write: bool,
90    ) -> PolarsResult<TableCredentials> {
91        let bytes = do_request(
92            self.http_client
93                .post(format!(
94                    "{}{}",
95                    &self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
96                ))
97                .query(&[
98                    ("table_id", table_id),
99                    ("operation", if write { "READ_WRITE" } else { "READ" }),
100                ]),
101        )
102        .await?;
103
104        let out: TableCredentials = decode_json_response(&bytes)?;
105
106        Ok(out)
107    }
108
109    pub async fn create_catalog(
110        &self,
111        catalog_name: &str,
112        comment: Option<&str>,
113        storage_root: Option<&str>,
114    ) -> PolarsResult<CatalogInfo> {
115        let resp = do_request(
116            self.http_client
117                .post(format!(
118                    "{}{}",
119                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
120                ))
121                .json(&Body {
122                    name: catalog_name,
123                    comment,
124                    storage_root,
125                }),
126        )
127        .await?;
128
129        return decode_json_response(&resp);
130
131        #[derive(serde::Serialize)]
132        struct Body<'a> {
133            name: &'a str,
134            comment: Option<&'a str>,
135            storage_root: Option<&'a str>,
136        }
137    }
138
139    pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
140        let catalog_name = catalog_name.replace('/', "%2F");
141
142        do_request(
143            self.http_client
144                .delete(format!(
145                    "{}{}{}",
146                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
147                ))
148                .query(&[("force", force)]),
149        )
150        .await?;
151
152        Ok(())
153    }
154
155    pub async fn create_namespace(
156        &self,
157        catalog_name: &str,
158        namespace: &str,
159        comment: Option<&str>,
160        storage_root: Option<&str>,
161    ) -> PolarsResult<NamespaceInfo> {
162        let resp = do_request(
163            self.http_client
164                .post(format!(
165                    "{}{}",
166                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
167                ))
168                .json(&Body {
169                    name: namespace,
170                    catalog_name,
171                    comment,
172                    storage_root,
173                }),
174        )
175        .await?;
176
177        return decode_json_response(&resp);
178
179        #[derive(serde::Serialize)]
180        struct Body<'a> {
181            name: &'a str,
182            catalog_name: &'a str,
183            comment: Option<&'a str>,
184            storage_root: Option<&'a str>,
185        }
186    }
187
188    pub async fn delete_namespace(
189        &self,
190        catalog_name: &str,
191        namespace: &str,
192        force: bool,
193    ) -> PolarsResult<()> {
194        let full_name = format!(
195            "{}.{}",
196            catalog_name.replace('/', "%2F"),
197            namespace.replace('/', "%2F"),
198        );
199
200        do_request(
201            self.http_client
202                .delete(format!(
203                    "{}{}{}",
204                    &self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
205                ))
206                .query(&[("force", force)]),
207        )
208        .await?;
209
210        Ok(())
211    }
212
213    /// Note, `data_source_format` can be None for some `table_type`s.
214    #[allow(clippy::too_many_arguments)]
215    pub async fn create_table(
216        &self,
217        catalog_name: &str,
218        namespace: &str,
219        table_name: &str,
220        schema: Option<&Schema>,
221        table_type: &TableType,
222        data_source_format: Option<&DataSourceFormat>,
223        comment: Option<&str>,
224        storage_location: Option<&str>,
225        properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
226    ) -> PolarsResult<TableInfo> {
227        let columns = schema.map(schema_to_column_info_list).transpose()?;
228        let columns = columns.as_deref();
229
230        let resp = do_request(
231            self.http_client
232                .post(format!(
233                    "{}{}",
234                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
235                ))
236                .json(&Body {
237                    name: table_name,
238                    catalog_name,
239                    schema_name: namespace,
240                    table_type,
241                    data_source_format,
242                    comment,
243                    columns,
244                    storage_location,
245                    properties: properties.collect(),
246                }),
247        )
248        .await?;
249
250        return decode_json_response(&resp);
251
252        #[derive(serde::Serialize)]
253        struct Body<'a> {
254            name: &'a str,
255            catalog_name: &'a str,
256            schema_name: &'a str,
257            comment: Option<&'a str>,
258            table_type: &'a TableType,
259            #[serde(skip_serializing_if = "Option::is_none")]
260            data_source_format: Option<&'a DataSourceFormat>,
261            columns: Option<&'a [ColumnInfo]>,
262            storage_location: Option<&'a str>,
263            properties: PlHashMap<&'a str, &'a str>,
264        }
265    }
266
267    pub async fn delete_table(
268        &self,
269        catalog_name: &str,
270        namespace: &str,
271        table_name: &str,
272    ) -> PolarsResult<()> {
273        let full_name = format!(
274            "{}.{}.{}",
275            catalog_name.replace('/', "%2F"),
276            namespace.replace('/', "%2F"),
277            table_name.replace('/', "%2F"),
278        );
279
280        do_request(self.http_client.delete(format!(
281            "{}{}{}",
282            &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
283        )))
284        .await?;
285
286        Ok(())
287    }
288}
289
290pub struct CatalogClientBuilder {
291    workspace_url: Option<String>,
292    bearer_token: Option<String>,
293}
294
295#[allow(clippy::derivable_impls)]
296impl Default for CatalogClientBuilder {
297    fn default() -> Self {
298        Self {
299            workspace_url: None,
300            bearer_token: None,
301        }
302    }
303}
304
305impl CatalogClientBuilder {
306    pub fn new() -> Self {
307        Self::default()
308    }
309
310    pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
311        self.workspace_url = Some(workspace_url.into());
312        self
313    }
314
315    pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
316        self.bearer_token = Some(bearer_token.into());
317        self
318    }
319
320    pub fn build(self) -> PolarsResult<CatalogClient> {
321        let Some(workspace_url) = self.workspace_url else {
322            polars_bail!(ComputeError: "expected Some(_) for workspace_url")
323        };
324
325        Ok(CatalogClient {
326            workspace_url,
327            http_client: {
328                let builder = reqwest::ClientBuilder::new().user_agent("polars");
329
330                let builder = if let Some(bearer_token) = self.bearer_token {
331                    use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
332
333                    let mut headers = HeaderMap::new();
334
335                    let mut auth_value =
336                        HeaderValue::from_str(format!("Bearer {}", bearer_token).as_str()).unwrap();
337                    auth_value.set_sensitive(true);
338
339                    headers.insert(AUTHORIZATION, auth_value);
340                    headers.insert(USER_AGENT, "polars".try_into().unwrap());
341
342                    builder.default_headers(headers)
343                } else {
344                    builder
345                };
346
347                builder.build().map_err(to_compute_err)?
348            },
349        })
350    }
351}
352
353pub struct ListCatalogs(pub(crate) PageWalker);
354impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
355
356pub struct ListSchemas(pub(crate) PageWalker);
357impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
358
359pub struct ListTables(pub(crate) PageWalker);
360impl_page_walk!(ListTables, TableInfo, key_name = tables);