polars_lazy/frame/
exitable.rs1use std::sync::Mutex;
2use std::sync::mpsc::{Receiver, channel};
3
4use polars_core::POOL;
5use polars_utils::relaxed_cell::RelaxedCell;
6
7use super::*;
8
9impl LazyFrame {
10 pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
11 let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;
12
13 let (tx, rx) = channel();
14 let token = state.cancel_token();
15
16 if physical_plan.is_cache_prefiller() {
17 #[cfg(feature = "async")]
18 {
19 polars_io::pl_async::get_runtime().spawn_blocking(move || {
20 let result = physical_plan.execute(&mut state);
21 tx.send(result).unwrap();
22 });
23 }
24 #[cfg(not(feature = "async"))]
25 {
26 std::thread::spawn(move || {
27 let result = physical_plan.execute(&mut state);
28 tx.send(result).unwrap();
29 });
30 }
31 } else {
32 POOL.spawn_fifo(move || {
33 let result = physical_plan.execute(&mut state);
34 tx.send(result).unwrap();
35 });
36 }
37
38 Ok(InProcessQuery {
39 rx: Arc::new(Mutex::new(rx)),
40 token,
41 })
42 }
43}
44
45#[derive(Clone)]
46pub struct InProcessQuery {
47 rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
48 token: Arc<RelaxedCell<bool>>,
49}
50
51impl InProcessQuery {
52 pub fn cancel(&self) {
54 self.token.store(true)
55 }
56
57 pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
62 let rx = self.rx.lock().unwrap();
63 rx.try_recv().ok()
64 }
65
66 pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
68 let rx = self.rx.lock().unwrap();
69 rx.recv().unwrap()
70 }
71}
72
73impl Drop for InProcessQuery {
74 fn drop(&mut self) {
75 self.token.store(true);
76 }
77}