1use polars_core::prelude::PlHashMap;
2use polars_core::schema::Schema;
3use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6use super::schema::schema_to_column_info_list;
7use super::utils::{PageWalker, do_request};
8use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9use crate::impl_page_walk;
10use crate::utils::decode_json_response;
11
12pub struct CatalogClient {
14 workspace_url: String,
15 http_client: reqwest::Client,
16}
17
18impl CatalogClient {
19 pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
20 ListCatalogs(PageWalker::new(self.http_client.get(format!(
21 "{}{}",
22 &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
23 ))))
24 .read_all_pages()
25 .await
26 }
27
28 pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
29 ListSchemas(PageWalker::new(
30 self.http_client
31 .get(format!(
32 "{}{}",
33 &self.workspace_url, "/api/2.1/unity-catalog/schemas"
34 ))
35 .query(&[("catalog_name", catalog_name)]),
36 ))
37 .read_all_pages()
38 .await
39 }
40
41 pub async fn list_tables(
42 &self,
43 catalog_name: &str,
44 namespace: &str,
45 ) -> PolarsResult<Vec<TableInfo>> {
46 ListTables(PageWalker::new(
47 self.http_client
48 .get(format!(
49 "{}{}",
50 &self.workspace_url, "/api/2.1/unity-catalog/tables"
51 ))
52 .query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
53 ))
54 .read_all_pages()
55 .await
56 }
57
58 pub async fn get_table_info(
59 &self,
60 catalog_name: &str,
61 namespace: &str,
62 table_name: &str,
63 ) -> PolarsResult<TableInfo> {
64 let full_table_name = format!(
65 "{}.{}.{}",
66 catalog_name.replace('/', "%2F"),
67 namespace.replace('/', "%2F"),
68 table_name.replace('/', "%2F")
69 );
70
71 let bytes = do_request(
72 self.http_client
73 .get(format!(
74 "{}{}{}",
75 &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
76 ))
77 .query(&[("full_name", full_table_name)]),
78 )
79 .await?;
80
81 let out: TableInfo = decode_json_response(&bytes)?;
82
83 Ok(out)
84 }
85
86 pub async fn get_table_credentials(
87 &self,
88 table_id: &str,
89 write: bool,
90 ) -> PolarsResult<TableCredentials> {
91 let bytes = do_request(
92 self.http_client
93 .post(format!(
94 "{}{}",
95 &self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
96 ))
97 .query(&[
98 ("table_id", table_id),
99 ("operation", if write { "READ_WRITE" } else { "READ" }),
100 ]),
101 )
102 .await?;
103
104 let out: TableCredentials = decode_json_response(&bytes)?;
105
106 Ok(out)
107 }
108
109 pub async fn create_catalog(
110 &self,
111 catalog_name: &str,
112 comment: Option<&str>,
113 storage_root: Option<&str>,
114 ) -> PolarsResult<CatalogInfo> {
115 let resp = do_request(
116 self.http_client
117 .post(format!(
118 "{}{}",
119 &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
120 ))
121 .json(&Body {
122 name: catalog_name,
123 comment,
124 storage_root,
125 }),
126 )
127 .await?;
128
129 return decode_json_response(&resp);
130
131 #[derive(serde::Serialize)]
132 struct Body<'a> {
133 name: &'a str,
134 comment: Option<&'a str>,
135 storage_root: Option<&'a str>,
136 }
137 }
138
139 pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
140 let catalog_name = catalog_name.replace('/', "%2F");
141
142 do_request(
143 self.http_client
144 .delete(format!(
145 "{}{}{}",
146 &self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
147 ))
148 .query(&[("force", force)]),
149 )
150 .await?;
151
152 Ok(())
153 }
154
155 pub async fn create_namespace(
156 &self,
157 catalog_name: &str,
158 namespace: &str,
159 comment: Option<&str>,
160 storage_root: Option<&str>,
161 ) -> PolarsResult<NamespaceInfo> {
162 let resp = do_request(
163 self.http_client
164 .post(format!(
165 "{}{}",
166 &self.workspace_url, "/api/2.1/unity-catalog/schemas"
167 ))
168 .json(&Body {
169 name: namespace,
170 catalog_name,
171 comment,
172 storage_root,
173 }),
174 )
175 .await?;
176
177 return decode_json_response(&resp);
178
179 #[derive(serde::Serialize)]
180 struct Body<'a> {
181 name: &'a str,
182 catalog_name: &'a str,
183 comment: Option<&'a str>,
184 storage_root: Option<&'a str>,
185 }
186 }
187
188 pub async fn delete_namespace(
189 &self,
190 catalog_name: &str,
191 namespace: &str,
192 force: bool,
193 ) -> PolarsResult<()> {
194 let full_name = format!(
195 "{}.{}",
196 catalog_name.replace('/', "%2F"),
197 namespace.replace('/', "%2F"),
198 );
199
200 do_request(
201 self.http_client
202 .delete(format!(
203 "{}{}{}",
204 &self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
205 ))
206 .query(&[("force", force)]),
207 )
208 .await?;
209
210 Ok(())
211 }
212
213 #[allow(clippy::too_many_arguments)]
215 pub async fn create_table(
216 &self,
217 catalog_name: &str,
218 namespace: &str,
219 table_name: &str,
220 schema: Option<&Schema>,
221 table_type: &TableType,
222 data_source_format: Option<&DataSourceFormat>,
223 comment: Option<&str>,
224 storage_location: Option<&str>,
225 properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
226 ) -> PolarsResult<TableInfo> {
227 let columns = schema.map(schema_to_column_info_list).transpose()?;
228 let columns = columns.as_deref();
229
230 let resp = do_request(
231 self.http_client
232 .post(format!(
233 "{}{}",
234 &self.workspace_url, "/api/2.1/unity-catalog/tables"
235 ))
236 .json(&Body {
237 name: table_name,
238 catalog_name,
239 schema_name: namespace,
240 table_type,
241 data_source_format,
242 comment,
243 columns,
244 storage_location,
245 properties: properties.collect(),
246 }),
247 )
248 .await?;
249
250 return decode_json_response(&resp);
251
252 #[derive(serde::Serialize)]
253 struct Body<'a> {
254 name: &'a str,
255 catalog_name: &'a str,
256 schema_name: &'a str,
257 comment: Option<&'a str>,
258 table_type: &'a TableType,
259 #[serde(skip_serializing_if = "Option::is_none")]
260 data_source_format: Option<&'a DataSourceFormat>,
261 columns: Option<&'a [ColumnInfo]>,
262 storage_location: Option<&'a str>,
263 properties: PlHashMap<&'a str, &'a str>,
264 }
265 }
266
267 pub async fn delete_table(
268 &self,
269 catalog_name: &str,
270 namespace: &str,
271 table_name: &str,
272 ) -> PolarsResult<()> {
273 let full_name = format!(
274 "{}.{}.{}",
275 catalog_name.replace('/', "%2F"),
276 namespace.replace('/', "%2F"),
277 table_name.replace('/', "%2F"),
278 );
279
280 do_request(self.http_client.delete(format!(
281 "{}{}{}",
282 &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
283 )))
284 .await?;
285
286 Ok(())
287 }
288}
289
290pub struct CatalogClientBuilder {
291 workspace_url: Option<String>,
292 bearer_token: Option<String>,
293}
294
295#[allow(clippy::derivable_impls)]
296impl Default for CatalogClientBuilder {
297 fn default() -> Self {
298 Self {
299 workspace_url: None,
300 bearer_token: None,
301 }
302 }
303}
304
305impl CatalogClientBuilder {
306 pub fn new() -> Self {
307 Self::default()
308 }
309
310 pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
311 self.workspace_url = Some(workspace_url.into());
312 self
313 }
314
315 pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
316 self.bearer_token = Some(bearer_token.into());
317 self
318 }
319
320 pub fn build(self) -> PolarsResult<CatalogClient> {
321 let Some(workspace_url) = self.workspace_url else {
322 polars_bail!(ComputeError: "expected Some(_) for workspace_url")
323 };
324
325 Ok(CatalogClient {
326 workspace_url,
327 http_client: {
328 let builder = reqwest::ClientBuilder::new().user_agent("polars");
329
330 let builder = if let Some(bearer_token) = self.bearer_token {
331 use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue, USER_AGENT};
332
333 let mut headers = HeaderMap::new();
334
335 let mut auth_value =
336 HeaderValue::from_str(format!("Bearer {}", bearer_token).as_str()).unwrap();
337 auth_value.set_sensitive(true);
338
339 headers.insert(AUTHORIZATION, auth_value);
340 headers.insert(USER_AGENT, "polars".try_into().unwrap());
341
342 builder.default_headers(headers)
343 } else {
344 builder
345 };
346
347 builder.build().map_err(to_compute_err)?
348 },
349 })
350 }
351}
352
353pub struct ListCatalogs(pub(crate) PageWalker);
354impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
355
356pub struct ListSchemas(pub(crate) PageWalker);
357impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
358
359pub struct ListTables(pub(crate) PageWalker);
360impl_page_walk!(ListTables, TableInfo, key_name = tables);