polars_io/catalog/unity/
client.rs

1use polars_core::prelude::PlHashMap;
2use polars_core::schema::Schema;
3use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6use super::schema::schema_to_column_info_list;
7use super::utils::{PageWalker, do_request};
8use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9use crate::cloud::USER_AGENT;
10use crate::impl_page_walk;
11use crate::utils::decode_json_response;
12
13/// Unity catalog client.
14pub struct CatalogClient {
15    workspace_url: String,
16    http_client: reqwest::Client,
17}
18
19impl CatalogClient {
20    pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
21        ListCatalogs(PageWalker::new(self.http_client.get(format!(
22            "{}{}",
23            &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
24        ))))
25        .read_all_pages()
26        .await
27    }
28
29    pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
30        ListSchemas(PageWalker::new(
31            self.http_client
32                .get(format!(
33                    "{}{}",
34                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
35                ))
36                .query(&[("catalog_name", catalog_name)]),
37        ))
38        .read_all_pages()
39        .await
40    }
41
42    pub async fn list_tables(
43        &self,
44        catalog_name: &str,
45        namespace: &str,
46    ) -> PolarsResult<Vec<TableInfo>> {
47        ListTables(PageWalker::new(
48            self.http_client
49                .get(format!(
50                    "{}{}",
51                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
52                ))
53                .query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
54        ))
55        .read_all_pages()
56        .await
57    }
58
59    pub async fn get_table_info(
60        &self,
61        catalog_name: &str,
62        namespace: &str,
63        table_name: &str,
64    ) -> PolarsResult<TableInfo> {
65        let full_table_name = format!(
66            "{}.{}.{}",
67            catalog_name.replace('/', "%2F"),
68            namespace.replace('/', "%2F"),
69            table_name.replace('/', "%2F")
70        );
71
72        let bytes = do_request(
73            self.http_client
74                .get(format!(
75                    "{}{}{}",
76                    &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
77                ))
78                .query(&[("full_name", full_table_name)]),
79        )
80        .await?;
81
82        let out: TableInfo = decode_json_response(&bytes)?;
83
84        Ok(out)
85    }
86
87    pub async fn get_table_credentials(
88        &self,
89        table_id: &str,
90        write: bool,
91    ) -> PolarsResult<TableCredentials> {
92        let bytes = do_request(
93            self.http_client
94                .post(format!(
95                    "{}{}",
96                    &self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
97                ))
98                .json(&Body {
99                    table_id,
100                    operation: if write { "READ_WRITE" } else { "READ" },
101                }),
102        )
103        .await?;
104
105        let out: TableCredentials = decode_json_response(&bytes)?;
106
107        return Ok(out);
108
109        #[derive(serde::Serialize)]
110        struct Body<'a> {
111            table_id: &'a str,
112            operation: &'a str,
113        }
114    }
115
116    pub async fn create_catalog(
117        &self,
118        catalog_name: &str,
119        comment: Option<&str>,
120        storage_root: Option<&str>,
121    ) -> PolarsResult<CatalogInfo> {
122        let resp = do_request(
123            self.http_client
124                .post(format!(
125                    "{}{}",
126                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
127                ))
128                .json(&Body {
129                    name: catalog_name,
130                    comment,
131                    storage_root,
132                }),
133        )
134        .await?;
135
136        return decode_json_response(&resp);
137
138        #[derive(serde::Serialize)]
139        struct Body<'a> {
140            name: &'a str,
141            comment: Option<&'a str>,
142            storage_root: Option<&'a str>,
143        }
144    }
145
146    pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
147        let catalog_name = catalog_name.replace('/', "%2F");
148
149        do_request(
150            self.http_client
151                .delete(format!(
152                    "{}{}{}",
153                    &self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
154                ))
155                .query(&[("force", force)]),
156        )
157        .await?;
158
159        Ok(())
160    }
161
162    pub async fn create_namespace(
163        &self,
164        catalog_name: &str,
165        namespace: &str,
166        comment: Option<&str>,
167        storage_root: Option<&str>,
168    ) -> PolarsResult<NamespaceInfo> {
169        let resp = do_request(
170            self.http_client
171                .post(format!(
172                    "{}{}",
173                    &self.workspace_url, "/api/2.1/unity-catalog/schemas"
174                ))
175                .json(&Body {
176                    name: namespace,
177                    catalog_name,
178                    comment,
179                    storage_root,
180                }),
181        )
182        .await?;
183
184        return decode_json_response(&resp);
185
186        #[derive(serde::Serialize)]
187        struct Body<'a> {
188            name: &'a str,
189            catalog_name: &'a str,
190            comment: Option<&'a str>,
191            storage_root: Option<&'a str>,
192        }
193    }
194
195    pub async fn delete_namespace(
196        &self,
197        catalog_name: &str,
198        namespace: &str,
199        force: bool,
200    ) -> PolarsResult<()> {
201        let full_name = format!(
202            "{}.{}",
203            catalog_name.replace('/', "%2F"),
204            namespace.replace('/', "%2F"),
205        );
206
207        do_request(
208            self.http_client
209                .delete(format!(
210                    "{}{}{}",
211                    &self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
212                ))
213                .query(&[("force", force)]),
214        )
215        .await?;
216
217        Ok(())
218    }
219
220    /// Note, `data_source_format` can be None for some `table_type`s.
221    #[allow(clippy::too_many_arguments)]
222    pub async fn create_table(
223        &self,
224        catalog_name: &str,
225        namespace: &str,
226        table_name: &str,
227        schema: Option<&Schema>,
228        table_type: &TableType,
229        data_source_format: Option<&DataSourceFormat>,
230        comment: Option<&str>,
231        storage_location: Option<&str>,
232        properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
233    ) -> PolarsResult<TableInfo> {
234        let columns = schema.map(schema_to_column_info_list).transpose()?;
235        let columns = columns.as_deref();
236
237        let resp = do_request(
238            self.http_client
239                .post(format!(
240                    "{}{}",
241                    &self.workspace_url, "/api/2.1/unity-catalog/tables"
242                ))
243                .json(&Body {
244                    name: table_name,
245                    catalog_name,
246                    schema_name: namespace,
247                    table_type,
248                    data_source_format,
249                    comment,
250                    columns,
251                    storage_location,
252                    properties: properties.collect(),
253                }),
254        )
255        .await?;
256
257        return decode_json_response(&resp);
258
259        #[derive(serde::Serialize)]
260        struct Body<'a> {
261            name: &'a str,
262            catalog_name: &'a str,
263            schema_name: &'a str,
264            comment: Option<&'a str>,
265            table_type: &'a TableType,
266            #[serde(skip_serializing_if = "Option::is_none")]
267            data_source_format: Option<&'a DataSourceFormat>,
268            columns: Option<&'a [ColumnInfo]>,
269            storage_location: Option<&'a str>,
270            properties: PlHashMap<&'a str, &'a str>,
271        }
272    }
273
274    pub async fn delete_table(
275        &self,
276        catalog_name: &str,
277        namespace: &str,
278        table_name: &str,
279    ) -> PolarsResult<()> {
280        let full_name = format!(
281            "{}.{}.{}",
282            catalog_name.replace('/', "%2F"),
283            namespace.replace('/', "%2F"),
284            table_name.replace('/', "%2F"),
285        );
286
287        do_request(self.http_client.delete(format!(
288            "{}{}{}",
289            &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
290        )))
291        .await?;
292
293        Ok(())
294    }
295}
296
297pub struct CatalogClientBuilder {
298    workspace_url: Option<String>,
299    bearer_token: Option<String>,
300}
301
302#[allow(clippy::derivable_impls)]
303impl Default for CatalogClientBuilder {
304    fn default() -> Self {
305        Self {
306            workspace_url: None,
307            bearer_token: None,
308        }
309    }
310}
311
312impl CatalogClientBuilder {
313    pub fn new() -> Self {
314        Self::default()
315    }
316
317    pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
318        self.workspace_url = Some(workspace_url.into());
319        self
320    }
321
322    pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
323        self.bearer_token = Some(bearer_token.into());
324        self
325    }
326
327    pub fn build(self) -> PolarsResult<CatalogClient> {
328        let Some(workspace_url) = self.workspace_url else {
329            polars_bail!(ComputeError: "expected Some(_) for workspace_url")
330        };
331
332        Ok(CatalogClient {
333            workspace_url,
334            http_client: {
335                let builder = reqwest::ClientBuilder::new().user_agent(USER_AGENT);
336
337                let builder = if let Some(bearer_token) = self.bearer_token {
338                    use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
339
340                    let mut headers = HeaderMap::new();
341
342                    let mut auth_value =
343                        HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();
344                    auth_value.set_sensitive(true);
345
346                    headers.insert(AUTHORIZATION, auth_value);
347                    headers.insert(reqwest::header::USER_AGENT, USER_AGENT.try_into().unwrap());
348
349                    builder.default_headers(headers)
350                } else {
351                    builder
352                };
353
354                builder.build().map_err(to_compute_err)?
355            },
356        })
357    }
358}
359
360pub struct ListCatalogs(pub(crate) PageWalker);
361impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
362
363pub struct ListSchemas(pub(crate) PageWalker);
364impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
365
366pub struct ListTables(pub(crate) PageWalker);
367impl_page_walk!(ListTables, TableInfo, key_name = tables);