polars_lazy/frame/
exitable.rs

1use std::sync::Mutex;
2use std::sync::mpsc::{Receiver, channel};
3
4use polars_core::POOL;
5use polars_utils::relaxed_cell::RelaxedCell;
6
7use super::*;
8
9impl LazyFrame {
10    pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
11        let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;
12
13        let (tx, rx) = channel();
14        let token = state.cancel_token();
15
16        if physical_plan.is_cache_prefiller() {
17            #[cfg(feature = "async")]
18            {
19                polars_io::pl_async::get_runtime().spawn_blocking(move || {
20                    let result = physical_plan.execute(&mut state);
21                    tx.send(result).unwrap();
22                });
23            }
24            #[cfg(not(feature = "async"))]
25            {
26                std::thread::spawn(move || {
27                    let result = physical_plan.execute(&mut state);
28                    tx.send(result).unwrap();
29                });
30            }
31        } else {
32            POOL.spawn_fifo(move || {
33                let result = physical_plan.execute(&mut state);
34                tx.send(result).unwrap();
35            });
36        }
37
38        Ok(InProcessQuery {
39            rx: Arc::new(Mutex::new(rx)),
40            token,
41        })
42    }
43}
44
45#[derive(Clone)]
46pub struct InProcessQuery {
47    rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
48    token: Arc<RelaxedCell<bool>>,
49}
50
51impl InProcessQuery {
52    /// Cancel the query at earliest convenience.
53    pub fn cancel(&self) {
54        self.token.store(true)
55    }
56
57    /// Fetch the result.
58    ///
59    /// If it is ready, a materialized DataFrame is returned.
60    /// If it is not ready it will return `None`.
61    pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
62        let rx = self.rx.lock().unwrap();
63        rx.try_recv().ok()
64    }
65
66    /// Await the result synchronously.
67    pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
68        let rx = self.rx.lock().unwrap();
69        rx.recv().unwrap()
70    }
71}
72
73impl Drop for InProcessQuery {
74    fn drop(&mut self) {
75        self.token.store(true);
76    }
77}