polars_io/catalog/unity/
client.rs

1use polars_core::prelude::PlHashMap;
2use polars_core::schema::Schema;
3use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6use super::schema::schema_to_column_info_list;
7use super::utils::{PageWalker, do_request};
8use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9use crate::cloud::USER_AGENT;
10use crate::impl_page_walk;
11use crate::utils::decode_json_response;
12
13/// Unity catalog client.
14pub struct CatalogClient {
15    workspace_url: String,
16    http_client: reqwest::Client,
17}
18
19impl CatalogClient {
20    pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
21        ListCatalogs(PageWalker::new(self.http_client.get(format!(
22            "{}{}",
23            &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
24        ))))
25        .read_all_pages()
26        .await
27    }
28
29    pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
30        ListSchemas(PageWalker::new(
31            self.http_client
32                .get(format!(
33                    "{}{}",
34                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
35                ))
36                .query(&[("catalog_name", catalog_name)]),
37        ))
38        .read_all_pages()
39        .await
40    }
41
42    pub async fn list_tables(
43        &self,
44        catalog_name: &str,
45        namespace: &str,
46    ) -> PolarsResult<Vec<TableInfo>> {
47        ListTables(PageWalker::new(
48            self.http_client
49                .get(format!(
50                    "{}{}",
51                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
52                ))
53                .query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
54        ))
55        .read_all_pages()
56        .await
57    }
58
59    pub async fn get_table_info(
60        &self,
61        catalog_name: &str,
62        namespace: &str,
63        table_name: &str,
64    ) -> PolarsResult<TableInfo> {
65        let full_table_name = format!(
66            "{}.{}.{}",
67            catalog_name.replace('/', "%2F"),
68            namespace.replace('/', "%2F"),
69            table_name.replace('/', "%2F")
70        );
71
72        let bytes = do_request(
73            self.http_client
74                .get(format!(
75                    "{}{}{}",
76                    &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
77                ))
78                .query(&[("full_name", full_table_name)]),
79        )
80        .await?;
81
82        let out: TableInfo = decode_json_response(&bytes)?;
83
84        Ok(out)
85    }
86
87    pub async fn get_table_credentials(
88        &self,
89        table_id: &str,
90        write: bool,
91    ) -> PolarsResult<TableCredentials> {
92        let bytes = do_request(
93            self.http_client
94                .post(format!(
95                    "{}{}",
96                    &self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
97                ))
98                .query(&[
99                    ("table_id", table_id),
100                    ("operation", if write { "READ_WRITE" } else { "READ" }),
101                ]),
102        )
103        .await?;
104
105        let out: TableCredentials = decode_json_response(&bytes)?;
106
107        Ok(out)
108    }
109
110    pub async fn create_catalog(
111        &self,
112        catalog_name: &str,
113        comment: Option<&str>,
114        storage_root: Option<&str>,
115    ) -> PolarsResult<CatalogInfo> {
116        let resp = do_request(
117            self.http_client
118                .post(format!(
119                    "{}{}",
120                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
121                ))
122                .json(&Body {
123                    name: catalog_name,
124                    comment,
125                    storage_root,
126                }),
127        )
128        .await?;
129
130        return decode_json_response(&resp);
131
132        #[derive(serde::Serialize)]
133        struct Body<'a> {
134            name: &'a str,
135            comment: Option<&'a str>,
136            storage_root: Option<&'a str>,
137        }
138    }
139
140    pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
141        let catalog_name = catalog_name.replace('/', "%2F");
142
143        do_request(
144            self.http_client
145                .delete(format!(
146                    "{}{}{}",
147                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
148                ))
149                .query(&[("force", force)]),
150        )
151        .await?;
152
153        Ok(())
154    }
155
156    pub async fn create_namespace(
157        &self,
158        catalog_name: &str,
159        namespace: &str,
160        comment: Option<&str>,
161        storage_root: Option<&str>,
162    ) -> PolarsResult<NamespaceInfo> {
163        let resp = do_request(
164            self.http_client
165                .post(format!(
166                    "{}{}",
167                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
168                ))
169                .json(&Body {
170                    name: namespace,
171                    catalog_name,
172                    comment,
173                    storage_root,
174                }),
175        )
176        .await?;
177
178        return decode_json_response(&resp);
179
180        #[derive(serde::Serialize)]
181        struct Body<'a> {
182            name: &'a str,
183            catalog_name: &'a str,
184            comment: Option<&'a str>,
185            storage_root: Option<&'a str>,
186        }
187    }
188
189    pub async fn delete_namespace(
190        &self,
191        catalog_name: &str,
192        namespace: &str,
193        force: bool,
194    ) -> PolarsResult<()> {
195        let full_name = format!(
196            "{}.{}",
197            catalog_name.replace('/', "%2F"),
198            namespace.replace('/', "%2F"),
199        );
200
201        do_request(
202            self.http_client
203                .delete(format!(
204                    "{}{}{}",
205                    &self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
206                ))
207                .query(&[("force", force)]),
208        )
209        .await?;
210
211        Ok(())
212    }
213
214    /// Note, `data_source_format` can be None for some `table_type`s.
215    #[allow(clippy::too_many_arguments)]
216    pub async fn create_table(
217        &self,
218        catalog_name: &str,
219        namespace: &str,
220        table_name: &str,
221        schema: Option<&Schema>,
222        table_type: &TableType,
223        data_source_format: Option<&DataSourceFormat>,
224        comment: Option<&str>,
225        storage_location: Option<&str>,
226        properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
227    ) -> PolarsResult<TableInfo> {
228        let columns = schema.map(schema_to_column_info_list).transpose()?;
229        let columns = columns.as_deref();
230
231        let resp = do_request(
232            self.http_client
233                .post(format!(
234                    "{}{}",
235                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
236                ))
237                .json(&Body {
238                    name: table_name,
239                    catalog_name,
240                    schema_name: namespace,
241                    table_type,
242                    data_source_format,
243                    comment,
244                    columns,
245                    storage_location,
246                    properties: properties.collect(),
247                }),
248        )
249        .await?;
250
251        return decode_json_response(&resp);
252
253        #[derive(serde::Serialize)]
254        struct Body<'a> {
255            name: &'a str,
256            catalog_name: &'a str,
257            schema_name: &'a str,
258            comment: Option<&'a str>,
259            table_type: &'a TableType,
260            #[serde(skip_serializing_if = "Option::is_none")]
261            data_source_format: Option<&'a DataSourceFormat>,
262            columns: Option<&'a [ColumnInfo]>,
263            storage_location: Option<&'a str>,
264            properties: PlHashMap<&'a str, &'a str>,
265        }
266    }
267
268    pub async fn delete_table(
269        &self,
270        catalog_name: &str,
271        namespace: &str,
272        table_name: &str,
273    ) -> PolarsResult<()> {
274        let full_name = format!(
275            "{}.{}.{}",
276            catalog_name.replace('/', "%2F"),
277            namespace.replace('/', "%2F"),
278            table_name.replace('/', "%2F"),
279        );
280
281        do_request(self.http_client.delete(format!(
282            "{}{}{}",
283            &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
284        )))
285        .await?;
286
287        Ok(())
288    }
289}
290
291pub struct CatalogClientBuilder {
292    workspace_url: Option<String>,
293    bearer_token: Option<String>,
294}
295
296#[allow(clippy::derivable_impls)]
297impl Default for CatalogClientBuilder {
298    fn default() -> Self {
299        Self {
300            workspace_url: None,
301            bearer_token: None,
302        }
303    }
304}
305
306impl CatalogClientBuilder {
307    pub fn new() -> Self {
308        Self::default()
309    }
310
311    pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
312        self.workspace_url = Some(workspace_url.into());
313        self
314    }
315
316    pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
317        self.bearer_token = Some(bearer_token.into());
318        self
319    }
320
321    pub fn build(self) -> PolarsResult<CatalogClient> {
322        let Some(workspace_url) = self.workspace_url else {
323            polars_bail!(ComputeError: "expected Some(_) for workspace_url")
324        };
325
326        Ok(CatalogClient {
327            workspace_url,
328            http_client: {
329                let builder = reqwest::ClientBuilder::new().user_agent(USER_AGENT);
330
331                let builder = if let Some(bearer_token) = self.bearer_token {
332                    use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
333
334                    let mut headers = HeaderMap::new();
335
336                    let mut auth_value =
337                        HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();
338                    auth_value.set_sensitive(true);
339
340                    headers.insert(AUTHORIZATION, auth_value);
341                    headers.insert(reqwest::header::USER_AGENT, USER_AGENT.try_into().unwrap());
342
343                    builder.default_headers(headers)
344                } else {
345                    builder
346                };
347
348                builder.build().map_err(to_compute_err)?
349            },
350        })
351    }
352}
353
354pub struct ListCatalogs(pub(crate) PageWalker);
355impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
356
357pub struct ListSchemas(pub(crate) PageWalker);
358impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
359
360pub struct ListTables(pub(crate) PageWalker);
361impl_page_walk!(ListTables, TableInfo, key_name = tables);