1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//! 基于 Mutex 和 Condvar 的 Parker 实现。

use crate::sync::atomic::AtomicUsize;
use crate::sync::atomic::Ordering::SeqCst;
use crate::sync::{Condvar, Mutex};
use crate::time::Duration;

const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;

pub struct Parker {
    state: AtomicUsize,
    lock: Mutex<()>,
    cvar: Condvar,
}

impl Parker {
    pub fn new() -> Self {
        Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
    }

    // 此实现不需要 `unsafe`,但其他实现可能假定这仅由拥有 Parker 的线程调用。
    //
    pub unsafe fn park(&self) {
        // 如果我们之前收到通知,那么我们将消耗此通知并迅速返回。
        //
        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
            return;
        }

        // 否则我们需要协调睡眠
        let mut m = self.lock.lock().unwrap();
        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
            Ok(_) => {}
            Err(NOTIFIED) => {
                // 即使我们知道它将是 `NOTIFIED`,也必须在这里读取。
                // 这是因为自从我们在上面的 `compare_exchange` 中读取 `NOTIFIED` 以来,可能已经再次调用 `unpark`。
                // 我们必须执行与该 `unpark` 同步的获取操作,以观察它在取消驻留之前进行的所有写操作。
                //
                // 为此,我们必须从对 `state` 的写入中读取。
                //
                let old = self.state.swap(EMPTY, SeqCst);
                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
                return;
            } // 应该消费这个通知,因此禁止在下一个 park 中进行虚假唤醒。
            Err(_) => panic!("inconsistent park state"),
        }
        loop {
            m = self.cvar.wait(m).unwrap();
            match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
                Ok(_) => return, // 收到通知
                Err(_) => {}     // 虚假唤醒,回去睡觉
            }
        }
    }

    // 此实现不需要 `unsafe`,但其他实现可能假定这仅由拥有 Parker 的线程调用。
    //
    pub unsafe fn park_timeout(&self, dur: Duration) {
        // 像上面的 `park` 一样,我们为已通知的线程提供了一条快速路径,然后我们开始协调睡眠。
        //
        // 快速返回。
        if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
            return;
        }
        let m = self.lock.lock().unwrap();
        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
            Ok(_) => {}
            Err(NOTIFIED) => {
                // 我们必须在这里再次读取,请参见 `park`。
                let old = self.state.swap(EMPTY, SeqCst);
                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
                return;
            } // 应该消费这个通知,因此禁止在下一个 park 中进行虚假唤醒。
            Err(_) => panic!("inconsistent park_timeout state"),
        }

        // 等待超时,如果我们虚假地唤醒或以其他方式从通知中醒来,我们只是想无条件地将状态设置为空,既可以使用通知,也可以不标记自己为停止状态。
        //
        //
        //
        let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
        match self.state.swap(EMPTY, SeqCst) {
            NOTIFIED => {} // 收到通知,欢呼!
            PARKED => {}   // 没有通知,alas
            n => panic!("inconsistent park_timeout state: {}", n),
        }
    }

    pub fn unpark(&self) {
        // 为了确保未驻留线程可以观察到我们在此调用之前所做的任何写操作,我们必须执行 `park` 可以与之同步的释放操作。
        // 为此,即使 `state` 已经是 `NOTIFIED`,我们也必须编写 `NOTIFIED`。
        // 这就是为什么这必须是交换操作,而不是如果失败读取 `NOTIFIED` 时返回的比较交换。
        //
        //
        //
        match self.state.swap(NOTIFIED, SeqCst) {
            EMPTY => return,    // 没有人在等
            NOTIFIED => return, // 已经停止
            PARKED => {}        // 要去叫醒某人
            _ => panic!("inconsistent state in unpark"),
        }

        // 从暂停的线程将 `state` 设置为 `PARKED` (或在虚假唤醒的情况下最后检查的 `state`) 到它实际等待 `cvar` 之间存在一段时间。
        // 如果我们在此期间进行通知,它将被忽略,然后当驻留线程进入睡眠状态时,它将永远不会醒来。
        // 幸运的是,它在此阶段已锁定 `lock`,因此我们可以获取 `lock` 以等待它准备好接收通知。
        //
        // 在调用 `notify_one` 之前释放 `lock` 意味着,当驻留线程唤醒时,不必等待我们释放 `lock` 即可唤醒它。
        //
        //
        //
        //
        //
        //
        drop(self.lock.lock().unwrap());
        self.cvar.notify_one()
    }
}