1use polars_core::prelude::PlHashMap;
2use polars_core::schema::Schema;
3use polars_error::{PolarsResult, polars_bail, to_compute_err};
4
5use super::models::{CatalogInfo, NamespaceInfo, TableCredentials, TableInfo};
6use super::schema::schema_to_column_info_list;
7use super::utils::{PageWalker, do_request};
8use crate::catalog::unity::models::{ColumnInfo, DataSourceFormat, TableType};
9use crate::cloud::USER_AGENT;
10use crate::impl_page_walk;
11use crate::utils::decode_json_response;
12
13pub struct CatalogClient {
15 workspace_url: String,
16 http_client: reqwest::Client,
17}
18
19impl CatalogClient {
20 pub async fn list_catalogs(&self) -> PolarsResult<Vec<CatalogInfo>> {
21 ListCatalogs(PageWalker::new(self.http_client.get(format!(
22 "{}{}",
23 &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
24 ))))
25 .read_all_pages()
26 .await
27 }
28
29 pub async fn list_namespaces(&self, catalog_name: &str) -> PolarsResult<Vec<NamespaceInfo>> {
30 ListSchemas(PageWalker::new(
31 self.http_client
32 .get(format!(
33 "{}{}",
34 &self.workspace_url, "/api/2.1/unity-catalog/schemas"
35 ))
36 .query(&[("catalog_name", catalog_name)]),
37 ))
38 .read_all_pages()
39 .await
40 }
41
42 pub async fn list_tables(
43 &self,
44 catalog_name: &str,
45 namespace: &str,
46 ) -> PolarsResult<Vec<TableInfo>> {
47 ListTables(PageWalker::new(
48 self.http_client
49 .get(format!(
50 "{}{}",
51 &self.workspace_url, "/api/2.1/unity-catalog/tables"
52 ))
53 .query(&[("catalog_name", catalog_name), ("schema_name", namespace)]),
54 ))
55 .read_all_pages()
56 .await
57 }
58
59 pub async fn get_table_info(
60 &self,
61 catalog_name: &str,
62 namespace: &str,
63 table_name: &str,
64 ) -> PolarsResult<TableInfo> {
65 let full_table_name = format!(
66 "{}.{}.{}",
67 catalog_name.replace('/', "%2F"),
68 namespace.replace('/', "%2F"),
69 table_name.replace('/', "%2F")
70 );
71
72 let bytes = do_request(
73 self.http_client
74 .get(format!(
75 "{}{}{}",
76 &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_table_name
77 ))
78 .query(&[("full_name", full_table_name)]),
79 )
80 .await?;
81
82 let out: TableInfo = decode_json_response(&bytes)?;
83
84 Ok(out)
85 }
86
87 pub async fn get_table_credentials(
88 &self,
89 table_id: &str,
90 write: bool,
91 ) -> PolarsResult<TableCredentials> {
92 let bytes = do_request(
93 self.http_client
94 .post(format!(
95 "{}{}",
96 &self.workspace_url, "/api/2.1/unity-catalog/temporary-table-credentials"
97 ))
98 .query(&[
99 ("table_id", table_id),
100 ("operation", if write { "READ_WRITE" } else { "READ" }),
101 ]),
102 )
103 .await?;
104
105 let out: TableCredentials = decode_json_response(&bytes)?;
106
107 Ok(out)
108 }
109
110 pub async fn create_catalog(
111 &self,
112 catalog_name: &str,
113 comment: Option<&str>,
114 storage_root: Option<&str>,
115 ) -> PolarsResult<CatalogInfo> {
116 let resp = do_request(
117 self.http_client
118 .post(format!(
119 "{}{}",
120 &self.workspace_url, "/api/2.1/unity-catalog/catalogs"
121 ))
122 .json(&Body {
123 name: catalog_name,
124 comment,
125 storage_root,
126 }),
127 )
128 .await?;
129
130 return decode_json_response(&resp);
131
132 #[derive(serde::Serialize)]
133 struct Body<'a> {
134 name: &'a str,
135 comment: Option<&'a str>,
136 storage_root: Option<&'a str>,
137 }
138 }
139
140 pub async fn delete_catalog(&self, catalog_name: &str, force: bool) -> PolarsResult<()> {
141 let catalog_name = catalog_name.replace('/', "%2F");
142
143 do_request(
144 self.http_client
145 .delete(format!(
146 "{}{}{}",
147 &self.workspace_url, "/api/2.1/unity-catalog/catalogs/", catalog_name
148 ))
149 .query(&[("force", force)]),
150 )
151 .await?;
152
153 Ok(())
154 }
155
156 pub async fn create_namespace(
157 &self,
158 catalog_name: &str,
159 namespace: &str,
160 comment: Option<&str>,
161 storage_root: Option<&str>,
162 ) -> PolarsResult<NamespaceInfo> {
163 let resp = do_request(
164 self.http_client
165 .post(format!(
166 "{}{}",
167 &self.workspace_url, "/api/2.1/unity-catalog/schemas"
168 ))
169 .json(&Body {
170 name: namespace,
171 catalog_name,
172 comment,
173 storage_root,
174 }),
175 )
176 .await?;
177
178 return decode_json_response(&resp);
179
180 #[derive(serde::Serialize)]
181 struct Body<'a> {
182 name: &'a str,
183 catalog_name: &'a str,
184 comment: Option<&'a str>,
185 storage_root: Option<&'a str>,
186 }
187 }
188
189 pub async fn delete_namespace(
190 &self,
191 catalog_name: &str,
192 namespace: &str,
193 force: bool,
194 ) -> PolarsResult<()> {
195 let full_name = format!(
196 "{}.{}",
197 catalog_name.replace('/', "%2F"),
198 namespace.replace('/', "%2F"),
199 );
200
201 do_request(
202 self.http_client
203 .delete(format!(
204 "{}{}{}",
205 &self.workspace_url, "/api/2.1/unity-catalog/schemas/", full_name
206 ))
207 .query(&[("force", force)]),
208 )
209 .await?;
210
211 Ok(())
212 }
213
214 #[allow(clippy::too_many_arguments)]
216 pub async fn create_table(
217 &self,
218 catalog_name: &str,
219 namespace: &str,
220 table_name: &str,
221 schema: Option<&Schema>,
222 table_type: &TableType,
223 data_source_format: Option<&DataSourceFormat>,
224 comment: Option<&str>,
225 storage_location: Option<&str>,
226 properties: &mut (dyn Iterator<Item = (&str, &str)> + Send + Sync),
227 ) -> PolarsResult<TableInfo> {
228 let columns = schema.map(schema_to_column_info_list).transpose()?;
229 let columns = columns.as_deref();
230
231 let resp = do_request(
232 self.http_client
233 .post(format!(
234 "{}{}",
235 &self.workspace_url, "/api/2.1/unity-catalog/tables"
236 ))
237 .json(&Body {
238 name: table_name,
239 catalog_name,
240 schema_name: namespace,
241 table_type,
242 data_source_format,
243 comment,
244 columns,
245 storage_location,
246 properties: properties.collect(),
247 }),
248 )
249 .await?;
250
251 return decode_json_response(&resp);
252
253 #[derive(serde::Serialize)]
254 struct Body<'a> {
255 name: &'a str,
256 catalog_name: &'a str,
257 schema_name: &'a str,
258 comment: Option<&'a str>,
259 table_type: &'a TableType,
260 #[serde(skip_serializing_if = "Option::is_none")]
261 data_source_format: Option<&'a DataSourceFormat>,
262 columns: Option<&'a [ColumnInfo]>,
263 storage_location: Option<&'a str>,
264 properties: PlHashMap<&'a str, &'a str>,
265 }
266 }
267
268 pub async fn delete_table(
269 &self,
270 catalog_name: &str,
271 namespace: &str,
272 table_name: &str,
273 ) -> PolarsResult<()> {
274 let full_name = format!(
275 "{}.{}.{}",
276 catalog_name.replace('/', "%2F"),
277 namespace.replace('/', "%2F"),
278 table_name.replace('/', "%2F"),
279 );
280
281 do_request(self.http_client.delete(format!(
282 "{}{}{}",
283 &self.workspace_url, "/api/2.1/unity-catalog/tables/", full_name
284 )))
285 .await?;
286
287 Ok(())
288 }
289}
290
291pub struct CatalogClientBuilder {
292 workspace_url: Option<String>,
293 bearer_token: Option<String>,
294}
295
296#[allow(clippy::derivable_impls)]
297impl Default for CatalogClientBuilder {
298 fn default() -> Self {
299 Self {
300 workspace_url: None,
301 bearer_token: None,
302 }
303 }
304}
305
306impl CatalogClientBuilder {
307 pub fn new() -> Self {
308 Self::default()
309 }
310
311 pub fn with_workspace_url(mut self, workspace_url: impl Into<String>) -> Self {
312 self.workspace_url = Some(workspace_url.into());
313 self
314 }
315
316 pub fn with_bearer_token(mut self, bearer_token: impl Into<String>) -> Self {
317 self.bearer_token = Some(bearer_token.into());
318 self
319 }
320
321 pub fn build(self) -> PolarsResult<CatalogClient> {
322 let Some(workspace_url) = self.workspace_url else {
323 polars_bail!(ComputeError: "expected Some(_) for workspace_url")
324 };
325
326 Ok(CatalogClient {
327 workspace_url,
328 http_client: {
329 let builder = reqwest::ClientBuilder::new().user_agent(USER_AGENT);
330
331 let builder = if let Some(bearer_token) = self.bearer_token {
332 use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
333
334 let mut headers = HeaderMap::new();
335
336 let mut auth_value =
337 HeaderValue::from_str(format!("Bearer {bearer_token}").as_str()).unwrap();
338 auth_value.set_sensitive(true);
339
340 headers.insert(AUTHORIZATION, auth_value);
341 headers.insert(reqwest::header::USER_AGENT, USER_AGENT.try_into().unwrap());
342
343 builder.default_headers(headers)
344 } else {
345 builder
346 };
347
348 builder.build().map_err(to_compute_err)?
349 },
350 })
351 }
352}
353
354pub struct ListCatalogs(pub(crate) PageWalker);
355impl_page_walk!(ListCatalogs, CatalogInfo, key_name = catalogs);
356
357pub struct ListSchemas(pub(crate) PageWalker);
358impl_page_walk!(ListSchemas, NamespaceInfo, key_name = schemas);
359
360pub struct ListTables(pub(crate) PageWalker);
361impl_page_walk!(ListTables, TableInfo, key_name = tables);