polars_core/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(feature = "simd", feature(portable_simd))]
3#![allow(ambiguous_glob_reexports)]
4#![cfg_attr(
5    feature = "allow_unused",
6    allow(unused, dead_code, irrefutable_let_patterns)
7)] // Maybe be caused by some feature
8// combinations
9#![cfg_attr(feature = "nightly", allow(clippy::non_canonical_partial_ord_impl))] // remove once stable
10extern crate core;
11
12#[macro_use]
13pub mod utils;
14pub mod chunked_array;
15pub mod config;
16pub mod datatypes;
17pub mod error;
18pub mod fmt;
19pub mod frame;
20pub mod functions;
21pub mod hashing;
22mod named_from;
23pub mod prelude;
24#[cfg(feature = "random")]
25pub mod random;
26pub mod scalar;
27pub mod schema;
28#[cfg(feature = "serde")]
29pub mod serde;
30pub mod series;
31pub mod testing;
32#[cfg(test)]
33mod tests;
34
35use std::cell::{Cell, RefCell};
36use std::sync::{LazyLock, Mutex};
37use std::time::{SystemTime, UNIX_EPOCH};
38
39pub use datatypes::SchemaExtPl;
40pub use hashing::IdBuildHasher;
41use rayon::{ThreadPool, ThreadPoolBuilder};
42
43pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
44    SystemTime::now()
45        .duration_since(UNIX_EPOCH)
46        .unwrap()
47        .as_nanos()
48});
49
50pub struct POOL;
51
52// Thread locals to allow disabling threading for specific threads.
53#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
54thread_local! {
55    pub static ALLOW_RAYON_THREADS: Cell<bool> = const { Cell::new(true) };
56    static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
57        ThreadPoolBuilder::new()
58            .use_current_thread()
59            .num_threads(1)
60            .build()
61            .expect("could not create no-op thread pool")
62    );
63}
64
65impl POOL {
66    pub fn install<OP, R>(&self, op: OP) -> R
67    where
68        OP: FnOnce() -> R + Send,
69        R: Send,
70    {
71        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
72        {
73            op()
74        }
75
76        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
77        {
78            self.with(|p| p.install(op))
79        }
80    }
81
82    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
83    where
84        A: FnOnce() -> RA + Send,
85        B: FnOnce() -> RB + Send,
86        RA: Send,
87        RB: Send,
88    {
89        self.install(|| rayon::join(oper_a, oper_b))
90    }
91
92    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
93    where
94        OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
95        R: Send,
96    {
97        self.install(|| rayon::scope(op))
98    }
99
100    pub fn spawn<OP>(&self, op: OP)
101    where
102        OP: FnOnce() + Send + 'static,
103    {
104        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
105        {
106            rayon::spawn(op)
107        }
108
109        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
110        {
111            self.with(|p| {
112                p.spawn(op);
113                if p.current_num_threads() == 1 {
114                    p.yield_now();
115                }
116            })
117        }
118    }
119
120    pub fn spawn_fifo<OP>(&self, op: OP)
121    where
122        OP: FnOnce() + Send + 'static,
123    {
124        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
125        {
126            rayon::spawn_fifo(op)
127        }
128
129        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
130        {
131            self.with(|p| {
132                p.spawn_fifo(op);
133                if p.current_num_threads() == 1 {
134                    p.yield_now();
135                }
136            })
137        }
138    }
139
140    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
141        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
142        {
143            None
144        }
145
146        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
147        {
148            self.with(|p| p.current_thread_has_pending_tasks())
149        }
150    }
151
152    pub fn current_thread_index(&self) -> Option<usize> {
153        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
154        {
155            rayon::current_thread_index()
156        }
157
158        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
159        {
160            self.with(|p| p.current_thread_index())
161        }
162    }
163
164    pub fn current_num_threads(&self) -> usize {
165        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
166        {
167            rayon::current_num_threads()
168        }
169
170        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
171        {
172            self.with(|p| p.current_num_threads())
173        }
174    }
175
176    #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
177    pub fn with<OP, R>(&self, op: OP) -> R
178    where
179        OP: FnOnce(&ThreadPool) -> R + Send,
180        R: Send,
181    {
182        if ALLOW_RAYON_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
183            op(&THREAD_POOL)
184        } else {
185            NOOP_POOL.with(|v| op(&v.borrow()))
186        }
187    }
188}
189
190// this is re-exported in utils for polars child crates
191#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
192pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
193    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
194    ThreadPoolBuilder::new()
195        .num_threads(
196            std::env::var("POLARS_MAX_THREADS")
197                .map(|s| s.parse::<usize>().expect("integer"))
198                .unwrap_or_else(|_| {
199                    std::thread::available_parallelism()
200                        .unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
201                        .get()
202                }),
203        )
204        .thread_name(move |i| format!("{thread_name}-{i}"))
205        .build()
206        .expect("could not spawn threads")
207});
208
209#[cfg(all(target_os = "emscripten", target_family = "wasm"))] // Use 1 rayon thread on emscripten
210pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
211    ThreadPoolBuilder::new()
212        .num_threads(1)
213        .use_current_thread()
214        .build()
215        .expect("could not create pool")
216});
217
218// utility for the tests to ensure a single thread can execute
219pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
220
221/// Default length for a `.head()` call
222pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
223/// Default length for a `.tail()` call
224pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
225pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;