use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::Mutex;
use polars_core::POOL;
use super::*;
impl LazyFrame {
    pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
        let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
        let (tx, rx) = channel();
        let token = state.cancel_token();
        POOL.spawn_fifo(move || {
            let result = physical_plan.execute(&mut state);
            tx.send(result).unwrap();
        });
        Ok(InProcessQuery {
            rx: Arc::new(Mutex::new(rx)),
            token,
        })
    }
}
#[derive(Clone)]
pub struct InProcessQuery {
    rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
    token: Arc<AtomicBool>,
}
impl InProcessQuery {
    pub fn cancel(&self) {
        self.token.store(true, Ordering::Relaxed)
    }
    pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
        let rx = self.rx.lock().unwrap();
        rx.try_recv().ok()
    }
    pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
        let rx = self.rx.lock().unwrap();
        rx.recv().unwrap()
    }
}
impl Drop for InProcessQuery {
    fn drop(&mut self) {
        self.token.store(true, Ordering::Relaxed);
    }
}