Skip to main content

polars_core/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![allow(ambiguous_glob_reexports)]
3#![cfg_attr(
4    feature = "allow_unused",
5    allow(unused, dead_code, irrefutable_let_patterns)
6)] // Maybe be caused by some feature
7// combinations
8extern crate core;
9
10#[macro_use]
11pub mod utils;
12pub mod chunked_array;
13pub mod config;
14pub mod datatypes;
15pub mod error;
16pub mod fmt;
17pub mod frame;
18pub mod functions;
19pub mod hashing;
20mod named_from;
21pub mod prelude;
22pub mod query_result;
23#[cfg(feature = "random")]
24pub mod random;
25pub mod scalar;
26pub mod schema;
27#[cfg(feature = "serde")]
28pub mod serde;
29pub mod series;
30pub mod testing;
31#[cfg(test)]
32mod tests;
33
34use std::cell::{Cell, RefCell};
35use std::sync::{LazyLock, Mutex};
36
37pub use datatypes::SchemaExtPl;
38pub use hashing::IdBuildHasher;
39use rayon::{ThreadPool, ThreadPoolBuilder};
40
41/// A secret ID used to limit deserialization of raw pointers to those
42/// generated by this instance of Polars.
43pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
44    let mut bytes = [0u8; 16];
45    getrandom::fill(&mut bytes).unwrap();
46    u128::from_le_bytes(bytes)
47});
48
49pub struct POOL;
50
51// Thread locals to allow disabling threading for specific threads.
52#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
53thread_local! {
54    pub static ALLOW_RAYON_THREADS: Cell<bool> = const { Cell::new(true) };
55    static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
56        ThreadPoolBuilder::new()
57            .use_current_thread()
58            .num_threads(1)
59            .build()
60            .expect("could not create no-op thread pool")
61    );
62}
63
64impl POOL {
65    pub fn install<OP, R>(&self, op: OP) -> R
66    where
67        OP: FnOnce() -> R + Send,
68        R: Send,
69    {
70        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
71        {
72            op()
73        }
74
75        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
76        {
77            self.with(|p| p.install(op))
78        }
79    }
80
81    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
82    where
83        A: FnOnce() -> RA + Send,
84        B: FnOnce() -> RB + Send,
85        RA: Send,
86        RB: Send,
87    {
88        self.install(|| rayon::join(oper_a, oper_b))
89    }
90
91    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
92    where
93        OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
94        R: Send,
95    {
96        self.install(|| rayon::scope(op))
97    }
98
99    pub fn spawn<OP>(&self, op: OP)
100    where
101        OP: FnOnce() + Send + 'static,
102    {
103        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
104        {
105            rayon::spawn(op)
106        }
107
108        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
109        {
110            self.with(|p| {
111                p.spawn(op);
112                if p.current_num_threads() == 1 {
113                    p.yield_now();
114                }
115            })
116        }
117    }
118
119    pub fn spawn_fifo<OP>(&self, op: OP)
120    where
121        OP: FnOnce() + Send + 'static,
122    {
123        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
124        {
125            rayon::spawn_fifo(op)
126        }
127
128        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
129        {
130            self.with(|p| {
131                p.spawn_fifo(op);
132                if p.current_num_threads() == 1 {
133                    p.yield_now();
134                }
135            })
136        }
137    }
138
139    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
140        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
141        {
142            None
143        }
144
145        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
146        {
147            self.with(|p| p.current_thread_has_pending_tasks())
148        }
149    }
150
151    pub fn current_thread_index(&self) -> Option<usize> {
152        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
153        {
154            rayon::current_thread_index()
155        }
156
157        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
158        {
159            self.with(|p| p.current_thread_index())
160        }
161    }
162
163    pub fn current_num_threads(&self) -> usize {
164        #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
165        {
166            rayon::current_num_threads()
167        }
168
169        #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
170        {
171            self.with(|p| p.current_num_threads())
172        }
173    }
174
175    #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
176    pub fn with<OP, R>(&self, op: OP) -> R
177    where
178        OP: FnOnce(&ThreadPool) -> R + Send,
179        R: Send,
180    {
181        if ALLOW_RAYON_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
182            op(&THREAD_POOL)
183        } else {
184            NOOP_POOL.with(|v| op(&v.borrow()))
185        }
186    }
187}
188
189// this is re-exported in utils for polars child crates
190#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
191pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
192    let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
193    ThreadPoolBuilder::new()
194        .num_threads(
195            std::env::var("POLARS_MAX_THREADS")
196                .map(|s| s.parse::<usize>().expect("integer"))
197                .unwrap_or_else(|_| {
198                    std::thread::available_parallelism()
199                        .unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
200                        .get()
201                }),
202        )
203        .thread_name(move |i| format!("{thread_name}-{i}"))
204        .build()
205        .expect("could not spawn threads")
206});
207
208#[cfg(all(target_os = "emscripten", target_family = "wasm"))] // Use 1 rayon thread on emscripten
209pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
210    ThreadPoolBuilder::new()
211        .num_threads(1)
212        .use_current_thread()
213        .build()
214        .expect("could not create pool")
215});
216
217// utility for the tests to ensure a single thread can execute
218pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
219
220/// Default length for a `.head()` call
221pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
222/// Default length for a `.tail()` call
223pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
224pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;