
#[cfg(test)]
mod tests;
use crate::fmt;
use crate::sync::{Condvar, Mutex};
/// 屏障使多个线程能够同步某些计算的开始。
///
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Barrier};
/// use std::thread;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in 0..10 {
/// let c = Arc::clone(&barrier);
/// // 相同的消息将一起打印。
/// // 您将看不到任何交错。
/// handles.push(thread::spawn(move|| {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// }));
/// }
/// // 等待其他线程完成。
/// for handle in handles {
/// handle.join().unwrap();
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Barrier {
lock: Mutex<BarrierState>,
cvar: Condvar,
num_threads: usize,
}
// 双重屏障的内部状态
struct BarrierState {
count: usize,
generation_id: usize,
}
/// 当 [`Barrier`] 中的所有线程都汇合时,[`Barrier::wait()`] 将返回 `BarrierWaitResult`。
///
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BarrierWaitResult(bool);
#[stable(feature = "std_debug", since = "1.16.0")]
impl fmt::Debug for Barrier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Barrier").finish_non_exhaustive()
}
}
impl Barrier {
/// 创建一个新的屏障,该屏障可以阻止给定数量的线程。
///
/// 屏障将阻塞调用 [`wait()`] 的 `n - 1` 个线程,然后在第 n 个线程调用 [`wait()`] 时立即唤醒所有线程。
///
///
/// [`wait()`]: Barrier::wait
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(10);
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
#[must_use]
pub fn new(n: usize) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState { count: 0, generation_id: 0 }),
cvar: Condvar::new(),
num_threads: n,
}
}
/// 阻塞当前线程,直到所有线程都在此处集合为止。
///
/// 所有线程集合一次后,屏障可以重新使用,并且可以连续使用。
///
/// 从该函数返回时,单个 (arbitrary) 线程将接收从 [`BarrierWaitResult::is_leader()`] 返回 `true` 的 [`BarrierWaitResult`],而所有其他线程将接收从 [`BarrierWaitResult::is_leader()`] 返回 `false` 的结果。
///
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Barrier};
/// use std::thread;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in 0..10 {
/// let c = Arc::clone(&barrier);
/// // 相同的消息将一起打印。
/// // 您将看不到任何交错。
/// handles.push(thread::spawn(move|| {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// }));
/// }
/// // 等待其他线程完成。
/// for handle in handles {
/// handle.join().unwrap();
/// }
/// ```
///
///
///
#[stable(feature = "rust1", since = "1.0.0")]
pub fn wait(&self) -> BarrierWaitResult {
let mut lock = self.lock.lock().unwrap();
let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_threads {
// 我们需要一个 while 循环来防止虚假唤醒。
// https://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id {
lock = self.cvar.wait(lock).unwrap();
}
BarrierWaitResult(false)
} else {
lock.count = 0;
lock.generation_id = lock.generation_id.wrapping_add(1);
self.cvar.notify_all();
BarrierWaitResult(true)
}
}
}
#[stable(feature = "std_debug", since = "1.16.0")]
impl fmt::Debug for BarrierWaitResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BarrierWaitResult").field("is_leader", &self.is_leader()).finish()
}
}
impl BarrierWaitResult {
/// 如果此线程是调用 [`Barrier::wait()`] 的 "leader thread",则返回 `true`。
///
/// 只有一个线程从其结果返回 `true`,所有其他线程将返回 `false`。
///
///
/// # Examples
///
/// ```
/// use std::sync::Barrier;
///
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// println!("{:?}", barrier_wait_result.is_leader());
/// ```
///
#[stable(feature = "rust1", since = "1.0.0")]
#[must_use]
pub fn is_leader(&self) -> bool {
self.0
}
}