1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
use super::{current, park, Builder, JoinInner, Result, Thread};
use crate::fmt;
use crate::io;
use crate::marker::PhantomData;
use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crate::sync::Arc;
/// 在其中生成作用域线程的作用域。
///
/// 有关详细信息,请参见 [`scope`]。
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub struct Scope<'scope, 'env: 'scope> {
data: Arc<ScopeData>,
/// 'scope 的不变性,以确保 ' 作用域不能缩小,这是健全性所必需的。
///
///
/// 如果没有不变性,这将编译得很好,但不健全:
///
/// ```compile_fail,E0373
/// std::thread::scope(|s| {
/// s.spawn(|| {
/// let a = String::from("abcd");
/// s.spawn(|| println!("{a:?}")); // 可能在 `a` 丢弃后运行
/// });
/// });
/// ```
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}
/// 在作用域线程上加入的拥有权限 (在其终止时阻塞)。
///
/// 有关详细信息,请参见 [`Scope::spawn`]。
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
pub(super) struct ScopeData {
num_running_threads: AtomicUsize,
a_thread_panicked: AtomicBool,
main_thread: Thread,
}
impl ScopeData {
pub(super) fn increment_num_running_threads(&self) {
// 我们用 usize::MAX/2 检查 'overflow',以确保它不会溢出到 0,这会导致不健全。
//
if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
// 这只能通过 `mem::forget()` 调用大量 ScopedJoinHandles 来实现。
self.decrement_num_running_threads(false);
panic!("too many running threads in thread scope");
}
}
pub(super) fn decrement_num_running_threads(&self, panic: bool) {
if panic {
self.a_thread_panicked.store(true, Ordering::Relaxed);
}
if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
self.main_thread.unpark();
}
}
}
/// 创建一个生成作用域线程的作用域。
///
/// 传递给 `scope` 的函数将提供一个 [`Scope`] 对象,通过该对象可以 [生成][`Scope::spawn`] 作用域线程。
///
/// 与非作用域线程不同,作用域线程可以借用非 `'static` 的数据,因为作用域保证所有线程都将在作用域的末尾加入。
///
/// 在此函数返回之前,在作用域内生成的所有未手动加入的线程将自动加入。
///
/// # Panics
///
/// 如果任何自动加入的线程发生 panic,则这个函数也会出现 panic。
///
/// 如果您想处理衍生线程的 panic,请在作用域结束之前 [`join`][ScopedJoinHandle::join] 它们。
///
/// # Example
///
/// ```
/// use std::thread;
///
/// let mut a = vec![1, 2, 3];
/// let mut x = 0;
///
/// thread::scope(|s| {
/// s.spawn(|| {
/// println!("hello from the first scoped thread");
/// // 我们可以在这里借用 `a`。
/// dbg!(&a);
/// });
/// s.spawn(|| {
/// println!("hello from the second scoped thread");
/// // 我们甚至可以在这里可变地借用 `x`,因为没有其他线程在使用它。
/////
/// x += a[0] + a[2];
/// });
/// println!("hello from the main thread");
/// });
///
/// // 在作用域之后,我们可以再次修改和访问我们的变量:
/// a.push(4);
/// assert_eq!(x, a.len());
/// ```
///
/// # Lifetimes
///
/// Scoped 线程涉及两个生命周期: `'scope` 和 `'env`。
///
/// `'scope` 生命周期代表作用域本身的生命周期。
/// 即: 可能产生新作用域线程的时间,以及它们可能仍在运行的时间。
/// 一旦这个生命周期结束,所有作用域的线程都会加入。
/// 这个生命周期在 `scope` 函数中开始,在 `f` (`scope` 的参数) 开始之前。
/// 它在 `f` 返回并且所有作用域线程都已加入之后但在 `scope` 返回之前结束。
///
/// `'env` 生命周期表示作用域线程所借用的生命周期。
/// 这个生命周期必须比调用到 `scope` 的时间长,因此不能小于 `'scope`。
/// 它可以像调用到 `scope` 一样小,这意味着任何超过这个调用的东西,例如在作用域之前定义的局部变量,都可以被作用域线程借用。
///
///
/// `'env: 'scope` 界限是 `Scope` 类型定义的一部分。
///
///
///
///
///
#[track_caller]
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn scope<'env, F, T>(f: F) -> T
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
{
// 我们把 `ScopeData` 放到 `Arc` 中,这样即使在这个函数返回之后,其他线程也可以完成它们的 `decrement_num_running_threads`。
//
let scope = Scope {
data: Arc::new(ScopeData {
num_running_threads: AtomicUsize::new(0),
main_thread: current(),
a_thread_panicked: AtomicBool::new(false),
}),
env: PhantomData,
scope: PhantomData,
};
// 运行 `f`,但捕获 panic,这样我们就可以确保等待所有线程加入。
let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
// 等待所有线程都完成。
while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
park();
}
// 从 `f` 抛出任何 panic,如果没有线程 panic,则返回 `f` 的值。
match result {
Err(e) => resume_unwind(e),
Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
panic!("a scoped thread panicked")
}
Ok(result) => result,
}
}
impl<'scope, 'env> Scope<'scope, 'env> {
/// 在作用域中产生一个新线程,为其返回一个 [`ScopedJoinHandle`]。
///
/// 与非作用域线程不同,使用此函数生成的线程可以从作用域外部借用非 `'static` 的数据。
/// 有关详细信息,请参见 [`scope`]。
///
/// 连接句柄提供了一个 [`join`] 方法,可以用来连接新生成的线程。
/// 如果生成的线程发生 panic,[`join`] 返回一个包含 panic 载荷的 [`Err`]。
///
/// 如果连接句柄被丢弃,则派生的线程将在作用域的末尾隐式连接。
/// 在这种情况下,如果派生的线程出现 panic,[`scope`] 将在所有线程加入后出现 panic。
///
/// 这个调用将使用 [`Builder`] 的默认参数创建一个线程。
/// 如果要指定栈大小或线程名,请改用 [`Builder::spawn_scoped`]。
///
/// # Panics
///
/// 如果操作系统无法创建线程,就会出现 panics。使用 [`Builder::spawn_scoped`] 从此类错误中恢复。
///
/// [`join`]: ScopedJoinHandle::join
///
///
///
///
///
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
}
}
impl Builder {
/// 使用通过此 `Builder` 设置的设置生成一个新的作用域线程。
///
/// 与 [`Scope::spawn`] 不同,此方法会生成一个 [`io::Result`] 来捕获在操作系统级别创建线程的任何失败。
///
///
/// [`io::Result`]: crate::io::Result
///
/// # Panics
///
/// 如果设置了线程名称并且它包含空字节,就会出现 panic。
///
/// # Example
///
/// ```
/// use std::thread;
///
/// let mut a = vec![1, 2, 3];
/// let mut x = 0;
///
/// thread::scope(|s| {
/// thread::Builder::new()
/// .name("first".to_string())
/// .spawn_scoped(s, ||
/// {
/// println!("hello from the {:?} scoped thread", thread::current().name());
/// // 我们可以在这里借用 `a`。
/// dbg!(&a);
/// })
/// .unwrap();
/// thread::Builder::new()
/// .name("second".to_string())
/// .spawn_scoped(s, ||
/// {
/// println!("hello from the {:?} scoped thread", thread::current().name());
/// // 我们甚至可以在这里可变地借用 `x`,因为没有其他线程在使用它。
/////
/// x += a[0] + a[2];
/// })
/// .unwrap();
/// println!("hello from the main thread");
/// });
///
/// // 在作用域之后,我们可以再次修改和访问我们的变量:
/// a.push(4);
/// assert_eq!(x, a.len());
/// ```
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn spawn_scoped<'scope, 'env, F, T>(
self,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?))
}
}
impl<'scope, T> ScopedJoinHandle<'scope, T> {
/// 提取底层线程的句柄。
///
/// # Examples
///
/// ```
/// use std::thread;
///
/// thread::scope(|s| {
/// let t = s.spawn(|| {
/// println!("hello");
/// });
/// println!("thread id: {:?}", t.thread().id());
/// });
/// ```
#[must_use]
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn thread(&self) -> &Thread {
&self.0.thread
}
/// 等待关联的线程完成。
///
/// 如果关联的线程已经完成,这个函数将立即返回。
///
/// 在 [原子内存排序][atomic memory orderings] 方面,相关线程的完成与这个函数的返回同步。
///
/// 换句话说,该线程执行的所有操作
/// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
/// 所有在`join`返回后发生的操作。
///
/// 如果关联的线程出现 panic,将返回一个带有 panic 载荷的 [`Err`]。
///
/// [atomic memory orderings]: crate::sync::atomic
///
/// # Examples
///
/// ```
/// use std::thread;
///
/// thread::scope(|s| {
/// let t = s.spawn(|| {
/// panic!("oh no");
/// });
/// assert!(t.join().is_err());
/// });
/// ```
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn join(self) -> Result<T> {
self.0.join()
}
/// 检查关联线程是否已完成其 main 函数的运行。
///
/// `is_finished` 支持实现非阻塞连接操作,通过检查 `is_finished`,如果返回 `false` 则调用 `join`。
/// 该函数不会阻止。
/// 要在等待线程完成时阻塞,请使用 [`join`][Self::join]。
///
/// 这可能在线程的 main 函数返回之后,但在线程本身停止运行之前短暂返回 `true`。
/// 但是,一旦返回 `true`,可以预期 [`join`][Self::join] 会快速返回,而不会被阻塞很长时间。
///
///
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub fn is_finished(&self) -> bool {
Arc::strong_count(&self.0.packet) == 1
}
}
#[stable(feature = "scoped_threads", since = "1.63.0")]
impl fmt::Debug for Scope<'_, '_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Scope")
.field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
.field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
.field("main_thread", &self.data.main_thread)
.finish_non_exhaustive()
}
}
#[stable(feature = "scoped_threads", since = "1.63.0")]
impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
}
}