polars_lazy/frame/
exitable.rs1use std::sync::Mutex;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::mpsc::{Receiver, channel};
4
5use polars_core::POOL;
6
7use super::*;
8
9impl LazyFrame {
10 pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
11 let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;
12
13 let (tx, rx) = channel();
14 let token = state.cancel_token();
15 POOL.spawn_fifo(move || {
16 let result = physical_plan.execute(&mut state);
17 tx.send(result).unwrap();
18 });
19
20 Ok(InProcessQuery {
21 rx: Arc::new(Mutex::new(rx)),
22 token,
23 })
24 }
25}
26
27#[derive(Clone)]
28pub struct InProcessQuery {
29 rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
30 token: Arc<AtomicBool>,
31}
32
33impl InProcessQuery {
34 pub fn cancel(&self) {
36 self.token.store(true, Ordering::Relaxed)
37 }
38
39 pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
44 let rx = self.rx.lock().unwrap();
45 rx.try_recv().ok()
46 }
47
48 pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
50 let rx = self.rx.lock().unwrap();
51 rx.recv().unwrap()
52 }
53}
54
55impl Drop for InProcessQuery {
56 fn drop(&mut self) {
57 self.token.store(true, Ordering::Relaxed);
58 }
59}