polars_lazy/frame/
exitable.rs

1use std::sync::Mutex;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::mpsc::{Receiver, channel};
4
5use polars_core::POOL;
6
7use super::*;
8
9impl LazyFrame {
10    pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
11        let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;
12
13        let (tx, rx) = channel();
14        let token = state.cancel_token();
15        POOL.spawn_fifo(move || {
16            let result = physical_plan.execute(&mut state);
17            tx.send(result).unwrap();
18        });
19
20        Ok(InProcessQuery {
21            rx: Arc::new(Mutex::new(rx)),
22            token,
23        })
24    }
25}
26
27#[derive(Clone)]
28pub struct InProcessQuery {
29    rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
30    token: Arc<AtomicBool>,
31}
32
33impl InProcessQuery {
34    /// Cancel the query at earliest convenience.
35    pub fn cancel(&self) {
36        self.token.store(true, Ordering::Relaxed)
37    }
38
39    /// Fetch the result.
40    ///
41    /// If it is ready, a materialized DataFrame is returned.
42    /// If it is not ready it will return `None`.
43    pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
44        let rx = self.rx.lock().unwrap();
45        rx.try_recv().ok()
46    }
47
48    /// Await the result synchronously.
49    pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
50        let rx = self.rx.lock().unwrap();
51        rx.recv().unwrap()
52    }
53}
54
55impl Drop for InProcessQuery {
56    fn drop(&mut self) {
57        self.token.store(true, Ordering::Relaxed);
58    }
59}