1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(feature = "simd", feature(portable_simd))]
3#![allow(ambiguous_glob_reexports)]
4#![cfg_attr(
5 feature = "allow_unused",
6 allow(unused, dead_code, irrefutable_let_patterns)
7)] #![cfg_attr(feature = "nightly", allow(clippy::non_canonical_partial_ord_impl))] extern crate core;
11
12#[macro_use]
13pub mod utils;
14pub mod chunked_array;
15pub mod config;
16pub mod datatypes;
17pub mod error;
18pub mod fmt;
19pub mod frame;
20pub mod functions;
21pub mod hashing;
22mod named_from;
23pub mod prelude;
24#[cfg(feature = "random")]
25pub mod random;
26pub mod scalar;
27pub mod schema;
28#[cfg(feature = "serde")]
29pub mod serde;
30pub mod series;
31pub mod testing;
32#[cfg(test)]
33mod tests;
34
35use std::cell::{Cell, RefCell};
36use std::sync::{LazyLock, Mutex};
37use std::time::{SystemTime, UNIX_EPOCH};
38
39pub use datatypes::SchemaExtPl;
40pub use hashing::IdBuildHasher;
41use rayon::{ThreadPool, ThreadPoolBuilder};
42
43pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
44 SystemTime::now()
45 .duration_since(UNIX_EPOCH)
46 .unwrap()
47 .as_nanos()
48});
49
50pub struct POOL;
51
52#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
54thread_local! {
55 pub static ALLOW_RAYON_THREADS: Cell<bool> = const { Cell::new(true) };
56 static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
57 ThreadPoolBuilder::new()
58 .use_current_thread()
59 .num_threads(1)
60 .build()
61 .expect("could not create no-op thread pool")
62 );
63}
64
65impl POOL {
66 pub fn install<OP, R>(&self, op: OP) -> R
67 where
68 OP: FnOnce() -> R + Send,
69 R: Send,
70 {
71 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
72 {
73 op()
74 }
75
76 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
77 {
78 self.with(|p| p.install(op))
79 }
80 }
81
82 pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
83 where
84 A: FnOnce() -> RA + Send,
85 B: FnOnce() -> RB + Send,
86 RA: Send,
87 RB: Send,
88 {
89 self.install(|| rayon::join(oper_a, oper_b))
90 }
91
92 pub fn scope<'scope, OP, R>(&self, op: OP) -> R
93 where
94 OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
95 R: Send,
96 {
97 self.install(|| rayon::scope(op))
98 }
99
100 pub fn spawn<OP>(&self, op: OP)
101 where
102 OP: FnOnce() + Send + 'static,
103 {
104 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
105 {
106 rayon::spawn(op)
107 }
108
109 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
110 {
111 self.with(|p| {
112 p.spawn(op);
113 if p.current_num_threads() == 1 {
114 p.yield_now();
115 }
116 })
117 }
118 }
119
120 pub fn spawn_fifo<OP>(&self, op: OP)
121 where
122 OP: FnOnce() + Send + 'static,
123 {
124 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
125 {
126 rayon::spawn_fifo(op)
127 }
128
129 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
130 {
131 self.with(|p| {
132 p.spawn_fifo(op);
133 if p.current_num_threads() == 1 {
134 p.yield_now();
135 }
136 })
137 }
138 }
139
140 pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
141 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
142 {
143 None
144 }
145
146 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
147 {
148 self.with(|p| p.current_thread_has_pending_tasks())
149 }
150 }
151
152 pub fn current_thread_index(&self) -> Option<usize> {
153 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
154 {
155 rayon::current_thread_index()
156 }
157
158 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
159 {
160 self.with(|p| p.current_thread_index())
161 }
162 }
163
164 pub fn current_num_threads(&self) -> usize {
165 #[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
166 {
167 rayon::current_num_threads()
168 }
169
170 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
171 {
172 self.with(|p| p.current_num_threads())
173 }
174 }
175
176 #[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
177 pub fn with<OP, R>(&self, op: OP) -> R
178 where
179 OP: FnOnce(&ThreadPool) -> R + Send,
180 R: Send,
181 {
182 if ALLOW_RAYON_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
183 op(&THREAD_POOL)
184 } else {
185 NOOP_POOL.with(|v| op(&v.borrow()))
186 }
187 }
188}
189
190#[cfg(not(target_family = "wasm"))] pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
193 let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
194 ThreadPoolBuilder::new()
195 .num_threads(
196 std::env::var("POLARS_MAX_THREADS")
197 .map(|s| s.parse::<usize>().expect("integer"))
198 .unwrap_or_else(|_| {
199 std::thread::available_parallelism()
200 .unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
201 .get()
202 }),
203 )
204 .thread_name(move |i| format!("{thread_name}-{i}"))
205 .build()
206 .expect("could not spawn threads")
207});
208
209#[cfg(all(target_os = "emscripten", target_family = "wasm"))] pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
211 ThreadPoolBuilder::new()
212 .num_threads(1)
213 .use_current_thread()
214 .build()
215 .expect("could not create pool")
216});
217
218pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
220
221pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
223pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
225pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;