rust-atomics-and-locks-p2~> 基础概念
<~~ 发表日期:2023-06-23 | 访问量:  | 本文词数:13354 | 预计阅读时间:67分钟 ~~>

本节我们将学习线程、互斥锁、条件变量、线程安全、共享和独占引用、内部可变性等内容

同系列传送门: rust-atomics-and-locks

前言

早在多核处理器普及之前, 操作系统就允许一台计算机同时运行许多程序
这是通过在进程间快速切换来实现的, 允许每个进程一个接一个地重复取得一点进展
如今, 几乎所有的计算机, 甚至我们的手机和手表都有多核处理器, 可以真正并行执行多个进程

操作系统尽可能将进程彼此隔离, 允许程序在完全不知道其他进程在做什么的情况下, 做自己的事情
例如, 一个进程在不询问操作系统的内核之前, 通常无法访问另一个进程的内存, 也无法以任何方式与之通信

但是, 程序可以生成额外的名为线程之物, 作为同一进程下的一部分
同一进程中的线程彼此之间不隔离, 线程共享内存, 并且可以通过该内存相互交互

本章将解释线程在 Rust 中是如何生成的, 以及围绕它们的所有基本概念, 例如如何在多个线程之间安全地共享数据
本章中解释的概念是本系列中其余部分的基础


Rust 中的线程

每个程序都只能从主线程开始, 即 main 函数所在的线程
该线程将执行您的 main 函数,并可以生成更多子线程

当主线程结束之后, 由其派生的子线程也将直接提前结束

在 Rust 中, 新线程是通过 std::thread::spawn 函数生成的, 它需要一个参数, 代表将执行的函数
此函数返回/结束后, 该线程将停止

来看个例子:

use std::thread;

fn main() {
    thread::spawn(f);
    thread::spawn(f);

    println!("Hello from the main thread.");
}

fn f() {
    println!("Hello from another thread!");

    let id = thread::current().id();
    println!("This is my thread id: {id:?}");
}

我们生成(spawn)了两个线程, 它们都将执行 f 作为其主要函数
这两个线程都将打印一条消息, 并显示其 线程ID, 而主线程也将打印自己的消息

Thread ID:
Rust 标准库为每个线程分配一个唯一的标识符, 此标识符可通过 Thread::id() 访问, 类型为 ThreadId
除了复制它们并检查相等性之外, 您对 ThreadId 无能为力, 不能保证这些 ID 将连续分配, 只是每个线程的 ID 会有所不同

如果您多次运行上面的示例程序, 您可能会注意到两次运行之间的输出不同, 这是我在一次特定运行期间在机器上获得的输出:

Hello from the main thread.
Hello from another thread!
This is my thread id:

令人惊讶的是, 部分输出似乎丢失了
这是因为, 在新生成的线程完成其传入的函数之前, 主线程就已经完成了 main 函数
从 main 返回将退出整个程序, 即使其他线程仍在运行

在该示例中, 在程序被主线程关闭前, 其中一个新生成的线程只发生了一半的消息
如果我们想确保, 线程在从 main 返回之前完成, 我们可以通过 join 来等待它们
为此, 我们要使用 spawn 函数返回的 JoinHandle:

fn main() {
    let t1 = thread::spawn(f);
    let t2 = thread::spawn(f);

    println!("Hello from the main thread.");

    t1.join().unwrap();
    t2.join().unwrap();
}

.join() 方法将等待线程完成执行, 并返回 std::thread::Result
若线程由于死机而未成功完成其功能, 这将包含死机消息, 我们可以尝试处理这种情况, 或者在加入恐慌线程时只调用 .unwrap() 以恐慌

运行此版本的代码将不再导致输出的截断:

Hello from the main thread.
Hello from another thread!
This is my thread id: ThreadId(3)
Hello from another thread!
This is my thread id: ThreadId(2)

运行之间仍然更改的是消息们的打印顺序, 因为是并发, 所以无法保证先后顺序

Output Locking:
println! 使用 std::io::Stdout::lock(), 来确保其输出不会因线程间的并发而交替
println!() 将等到任何并发运行的表达式完成, 然后再写入任何输出
如果不是这种情况, 我们可能会得到更多交错的混乱的输出:

Hello fromHello from another thread!
 another This is my threthreadHello fromthread id: ThreadId!
( the main thread.
2)This is my thread
id: ThreadId(3)

与其将函数传递给 std::thread::spawn, 不如将闭包传递给它, 这允许我们捕获值, 并将其移动到新线程中:

let numbers = vec![1, 2, 3];

thread::spawn(move || {
    for n in &numbers {
        println!("{n}");
    }
}).join().unwrap();

此处, numbers 的所有权转移到了新生成的线程中, 因为我们使用了 move 闭包
若我们未使用 move 关键字, 闭包将通过引用捕获 numbers, 这将导致编译错误, 因为新线程的寿命可能超过该变量

由于线程可能一直运行到程序执行结束, 因此 spawn 函数的参数类型具有 'static 约束
换句话说, 它只接受永远存在的函数, 而通过引用捕获局部变量的闭包, 闭包可能不会永远有效
因为, 当局部变量不复存在时,对该变量的引用自然变得无效, 闭包也因此变得无效

从线程中获取的返回值, 实际上就是传入的闭包的返回值, 它可以从 join 方法返回的 Result 中获取:

let numbers = Vec::from_iter(0..=1000);

let t = thread::spawn(move || {
    let len = numbers.len();
    let sum = numbers.iter().sum::<usize>();
    sum / len  // 1
});

let average = t.join().unwrap(); // 2

println!("average: {average}");

在这里, 闭包 (1) 返回的值通过 join 方法 (2) 的返回值获取
若 numbers 为空,则新线程会尝试除以零 (1), 这将导致 panic, 而 join 则会返回 Err, 导致主线程也因为 unwrap 而 panic

Thread Builder:*
std::thread::spawn() 实际上只是 std::thread::Builder::new().spawn().unwrap() 的一个简写
std::thread::Builder 允许您在生成新线程之前, 为其进行一些设置
您可以使用它来配置新线程的堆栈大小, 为新线程命名
通过 std::thread::current().name() 可以获取当前线程的名称, 这将在 panic 的消息中使用, 并将在大多数平台上的 debug 工具中可见

此外, Builderspawn 函数返回 std::io::Result, 允许您处理生成新线程失败的情况
比如操作系统内存不足,或者对您应用的资源限制(resource limit), 则可能会发生该情况
(若 spawn 函数无法生成新线程, 它只会产生 panic)


作用域线程

有一个问题就是, 默认的 spawn 会要求一个具有 'static 约束的闭包, 在编译器就确保了不会发生生命周期上的问题
但如果我们真的确定, 生成的线程肯定不会超过某个作用域, 那么该线程理论上就应该可以安全地借用局部变量, 只要它们比该作用域活得久

Rust 标准库提供了 std::thread::scope 函数, 来生成这样的作用域线程
它允许我们生成 "不超过我们传递给该函数的闭包范围" 的线程, 从而安全地借用局部变量

一个例子:

let numbers = vec![1, 2, 3];

thread::scope(|s| { // 1
    s.spawn(|| { // 2
        println!("length: {}", numbers.len());
    });
    s.spawn(|| { // 2
        for n in &numbers {
            println!("{n}");
        }
    });
}); // 3
  • 在 (1) 处: 我们调用 std::thread::scope 函数, 我们的闭包将在传入后立刻执行, 参数 s,代表 Scope
  • 在 (2) 处: 我们通过 s 参数, 生成了 作用域线程(Scoped Threads), 其传入的闭包可以借用局部变量, 如 numbers
  • 在 (3) 处: 我们生成的所有 Scoped-Thread, 若还没有 join, 将自动 join

它保证了作用域内生成的任何线程, 都不会超过作用域
正因如此, 这个 spawn 方法的参数类型上没有 'static 约束,允许我们引用任何内容,只要它比作用域长寿,例如 numbers

在上面的示例中, 两个新线程同时访问 numbers, 这很好, 因为所有都不会修改它
若我们要将第一个线程更改为修改 numbers, 编译器将不允许我们生成另一个也修改 numbers 的线程:

let mut numbers = vec![1, 2, 3];

thread::scope(|s| {
    s.spawn(|| {
        numbers.push(1);
    });
    s.spawn(|| {
        numbers.push(2); // Error!
    });
});

确切的错误消息取决于 Rust 编译器的版本, 因为它通常会得到改进, 以产生更好的报错,目前的报错信息如下:

error[E0499]: cannot borrow `numbers` as mutable more than once at a time
 --> example.rs:7:13
4 |     s.spawn(|| {
  |             -- first mutable borrow occurs here
5 |         numbers.push(1);
  |         ------- first borrow occurs due to use of `numbers` in closure

7 |     s.spawn(|| {
  |             ^^ second mutable borrow occurs here
8 |         numbers.push(2);
  |         ------- second borrow occurs due to use of `numbers` in closure

译者注:
为了更好地理解, 你可以观察下 std::thread::scope, 会发现其 lifetime 的关系如下:

pub fn scope<'env, F, T>(f: F) -> T
where
    F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,

根据文档所述, 'scope 表示作用域本身的 lfetime, 'env 表示作用域内线程借用的任何变量的 lifetime

'scope 的 lifetime 在 scope 函数开始之后开始,位于 f (传入的闭包参数) 之前, 其 lifetime 在 f 结束之后结束
该 lifetime 结束后, 所有生成的作用域线程都将被自动 join

关系如下:

┌──fn_scope(f: F)────────────────────┐
│                                    │
│  'scope                            │
│     |                              │
│     |  'f  // is your closure      │
│     |   |                          │
│     |   |                          │
│     |   |                          │
│     |   |                          │
│     |  'f                          │
│     |                              │
│     | // join scoped threads here  │
│     |                              │
│  'scope                            │
│                                    │
└────────────────────────────────────┘

而 'env 的约束是 'env: 'scope, 表示了作用域线程借用的任何变量的 lifetime 一定得小于作用域本身的 lifetime

多亏了作用域线程, 我们现在能在参数为 &self 的函数中, 轻松地将其传入线程
而不再需要导入诸如 rayon 这样的外部库, 或使用丑陋的 Arc 进行包装的同时损失了性能

The Leakpocalypse:
在 Rust 1.0 之前, 标准库有个名为 std::thread::scoped 的函数, 它会直接生成一个线程, 如同 std::thread::spawn
它允许非 'static 的闭包, 因为它返回的不是 JoinHandle, 而是个在 drop 时会 join 线程的 JoinGuard
任何借用的数据只需要比这 JoinGuard 存在更长的时间, 这似乎是安全的, 只要 JoinGuard 在某个时候被丢弃

但就在 Rust 1.0 发布之前, 很多事情慢慢变得清楚, 你不能保证某些东西一定会被丢弃, 这有很多办法
例如创建引用计数节点的循环, 或者 leak 它, 这都不会 drop 它

最终, 在一些人所说的 "泄漏启示录(Leakpocalypse)" 中, 得出的结论是:
安全接口的设计, 不能依赖于对象在其生命周期结束时总是会被丢弃的假设
泄漏对象可能会合理地导致泄漏更多对象(例如,泄漏 Vec 也会泄漏其元素), 但可能不会导致未定义的行为

由于这个结论, std::thread::scoped 不再被认为是安全的, 并从标准库中删除
此外, std::mem::forget 从 unsafe 变成了 safec, 以强调 forget/leak 始终是可能的

老生常谈的话: Rust 并不会阻止 内存泄漏, 内存泄漏 不归到 内存安全 的范畴里

直到很久以后, 在 Rust 1.63 中, 添加了一个新的 std::thread::scope 函数, 其中包含一个不依赖 Drop 来表示正确性的新设计


共享所有权

到目前为止, 我们已经研究了使用 move 闭包, 将值的所有权转移到线程, 并从寿命更长的父线程借用数据
当在两个线程之间共享数据时, 当两个线程都不能保证比另一个线程活得久, 则它们都不能成为该数据的所有者
它们之间共享的任何数据的 lifetime, 都需要与活的最长的线程一样长或更长

Static

有几种方法可以创建不属于单个线程的内容, 其中最简单的是使用 static 关键字
它由整个程序 “拥有”, 而不是单个线程

在以下示例中, 两个线程都可以访问 X , 但它们都不拥有它:

static X: [i32; 3] = [1, 2, 3];

thread::spawn(|| dbg!(&X));
thread::spawn(|| dbg!(&X));

一个 static 项, 其以 const 字面量作为初始值, 永远不会 drop
且在 main 函数启动之前, 它就已经存在, 每个线程都能借用它, 毕竟它保证永远存在

Leak

共享所有权的另一种方法是leak, 使用 Box::leak
它可以释放 Box 的所有权, 承诺永远不会 drop 它, 从那时起, Box 将永远存在, 没有所有者, 只要程序运行, 它就可以被任何线程借用

use std::thread;

fn main() {
    let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));

    thread::spawn(move || dbg!(x));
    thread::spawn(move || dbg!(x));
}

可能看上去, 在 move 闭包, 我们将所有权转移到了线程中
但仔细观察 x 的类型, 你会发现, 我们只是为线程提供了对数据的引用, 而不是数据本身的所有权

注意:

  • 独享引用(&T) 实现了 Copy 这个 trait, 这意味着当您 move 它时, 原始值仍然存在, 就像整数或布尔值一样
  • 'static 生命周期并不意味着 "该值自程序开始以来一直存在", 而只是 "它一直存在到程序结束", 过去的经历并不在乎

泄漏 Box 的缺点是, 我们正在泄漏内存, 我们分配一些东西, 但从不丢弃和解除分配它
如果这只发生有限的少量次数, 可能会达成很棒的效果, 但如果我们继续, 程序将慢慢耗尽内存

引用计数

为了确保共享的数据被 drop , 使得其内存被释放, 我们不能完全放弃其所有权
相反, 我们可以分享所有权, 通过跟踪所有者的数量, 我们可以确保仅在没有所有者时才丢弃该值

Rust 标准库通过 std::rc::Rc 类型提供此功能, 这是 "引用计数(References Counting)" 的缩写
它与 Box 非常相似, 除了 clone 它时并不会真的 clone 其包含的值, 而只是增加计数器的值
原来的 Rc 与克隆的 Rc, 都将引用相同的分配, 因此, 它们可以共享所有权

use std::rc::Rc;

let a = Rc::new([1, 2, 3]);
let b = a.clone();

assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation!

drop 一个 Rc 将减少计数器, 当最后一个 Rc 被 drop 时(计数器将降至零), 将 drop 其包含的值并释放内存

但是, 当我们尝试将 Rc 发送到另一个线程, 我们将遇到以下编译器错误:

error[E0277]: `Rc` cannot be sent between threads safely
    |
8   |     thread::spawn(move || dbg!(b));
    |                   ^^^^^^^^^^^^^^^

事实证明, Rc 不是线程安全的, 因为它没有实现 Send 这个 tait
未实现 Send 的类型, 表示其实现可能并没有考虑多线程, 导致不能将其 move 进其他线程
因此如果将 Rc 分配给多个线程, 则它们可能会尝试同时修改引用计数器, 这可能会产生不可预知的结果

但编译器阻止了我们

相反, 我们可以使用 std::sync::Arc, 它代表“原子引用计数(Atomic References Counting)”
它与 Rc 相同, 只是它保证对引用计数器的修改是不可分割的原子操作, 因此可以安全地将其用于多个线程

use std::sync::Arc;

let a = Arc::new([1, 2, 3]); // 1
let b = a.clone(); // 2

thread::spawn(move || dbg!(a)); // 3
thread::spawn(move || dbg!(b)); // 3

在上述代码中:

  • 在 (1) 处: 我们将数组放进了一个 Arc 中进行包装, 其引用计数器从 1 开始
  • 在 (2) 处: 克隆了 Arc, 使得引用计数增加到 2, 并为我们提供另一个 Arc, 指向相同的资源
  • 在 (3) 处: 两个线程都获得自己的 Arc, 可以访问共享数组, 两者都在 drop 使得计数器减 1
  • 在 (3) 处: 最后一个 dropArc 的线程, 使得计数器减小到 0, 此时将会 drop 被包装的数组, 释放资源

Naming Clones:
必须给 Arc 的每个克隆一个不同的名称, 会很快使代码变得非常混乱且难以理解
虽然 Arc 的每个克隆都是单独的对象, 但每个克隆都表示相同的共享值, 我们可以通过 Shadow 语法来处理

Rust 允许(并鼓励)你通过定义一个同名的新变量来隐藏变量, 如果在同一作用域内执行此操作, 则无法再使用原始变量
但是通过打开一个新作用域, 可以使用像 let a = a.clone(); 这样的语句, 在该作用域内重用名称, 同时将原始变量保留在作用域之外

通过将闭包包装在新作用域(使用 {})中, 我们可以在将变量移动到闭包之前克隆变量, 而无需重命名它们:

let a = Arc::new([1, 2, 3]);
let b = a.clone();

thread::spawn(move || {
    dbg!(b);
});

dbg!(a);
let a = Arc::new([1, 2, 3]);

thread::spawn({
    let a = a.clone();
    move || {
        dbg!(a);
    }
});

dbg!(a);

由于所有权是共享的, 因此引用计数指针 (Rc<T>Arc<T>) 与共享引用 (&T) 具有相同限制
它们不允许您对其包含的值进行可变访问, 因为该值可能同时被其他代码借用
例如, 当我们尝试对 Arc<[i32]> 中的整数切片进行排序, 编译器会阻止我们, 告诉我们不允许改变数据:

error[E0596]: cannot borrow data in an `Arc` as mutable
  |
6 |     a.sort();
  |     ^^^^^^^^

借用与数据竞争

在 Rust 中, 可以通过两种方式借用值:

  • 不可变借用(Immutable borrowing):
    通过 & 借用, 这得到了一个不可变引用(immutable reference), 可以复制这样的引用
    对被引用数据的访问, 在此类引用的所有副本之间共享, 编译器通常不允许通过这样的引用修改内容, 因为会影响借用了相同数据的其他代码

  • 可变借用(Mutable borrowing):
    通过 &mut 借用, 这得到了一个可变引用(mutable reference), 可变借用保证它是该数据唯一的有效借用(active borrow)
    因此, 这可以确保, 改变数据的内容不会影响到其他代码

这两个概念在一起, 完全防止了数据竞争(data races), 即一个线程正在改变数据, 而另一个线程同时访问数据的情况
数据竞争通常是未定义的行为(undefined behavior), 这意味着编译器不需考虑这些情况, 因为它只会假设这些情况不会发生

为阐明这到底意味着什么, 让我们看一个编译器可以使用借用规则做出有用假设的示例:

fn f(a: &i32, b: &mut i32) {
    let before = *a;
    *b += 1;
    let after = *a;
    if before != after {
        x(); // never happens
    }
}

在这里, 我们得到一个对整数的不可变引用, 并在递增 b 所引用的整数前后, 存储了整数的值
编译器可以自由地假设有关 "借用和数据竞争的基本规则" 得到维护, 这意味着 b 不可能像 a 一样引用相同的整数
事实上, 只要 a 还存在, 整个程序中没有任何东西可以可变地借用 a 所指的整数
因此, 编译器可以很容易地得出结论, *a 不会改变, before 永远等于 after
因此 if 语句永远无法成立, 可以从程序中完全删除对 x 的调用, 进行优化

编写一个打破编译器假设的 Rust 程序是不可能的, 除非使用 unsafe 块来禁用编译器的某些安全检查

Undefined Behavior(未定义的行为):
像 C, C++, Rust 这样的语言有一套规则, 需要遵循这些规则, 以避免所谓的未定义行为(ub)
例如, Rust 的规则之一是: 对任何对象都不能有多个可变引用

在 Rust 中, 只有在使用 unsafe 代码时才能违反这些规则中的任何一个
unsafe 在 Rust 中并不意味着代码不正确或不安全, 而是编译器没为您验证代码是否安全
如果代码确实违反了这些规则,则称为 unsound(不健全), 因此, 你可以将 unsafe, 等价于 trust_me

编译器会假设这些规则永远不会被破坏, 当被破坏时, 会导致一种被称为未定义行为的情况发生, 我们需要不惜一切代价避免这种情况
因为, 若我们允许编译器做出一个实际上不正确的假设, 它很容易导致代码不同部分的更多错误结论, 从而影响整个程序

作为一个例子, 让我们看一下在切片上使用 get_unchecked 方法的代码实例:

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

get_unchecked 方法, 让我们得到了指定索引位置的切片元素, 如同 a[index] 一样
但它允许编译器假设索引始终在边界内, 而无需进行任何边界检查

这意味着, 在此代码段中, 由于 a 的长度为 3, 因此编译器可能会假定 index 小于 3, 我们有责任确保其假设成立
如果我们打破这个假设, 例如, 如果我们在 index 等于 3 的情况下运行这个假设, 任何事情都可能发生
这可能会导致, 从内存中读取到了存储在 a 之后的字节中的任何内容, 程序可能会因此崩溃
比如, 它最终可能会执行你程序中一些完全不相关的部分, 造成各种破坏

令人惊讶的是, 未定义行为甚至可以 “影响过去”, 导致它之前的代码出现问题
为了理解这是如何发生的, 假设在先前代码的前面有个 match 语句, 如下所示:

match index {
   0 => x(),
   1 => y(),
   _ => z(index),
}

let a = [123, 456, 789];
let b = unsafe { a.get_unchecked(index) };

由于 unsafe 块, 编译器可以假定 index 只为 0、1 或 2
从逻辑上讲, 我们可以得出结论, 我们的 match 语句的最后一个分支只会与 2 匹配
因此 z(index) 只可能是 z(2), 该结论不仅可以用于优化 match, 还可优化 z 本身, 这可能包括删除代码中未使用的部分

如果我们让 index 为 3 时执行此操作, 我们的程序可能会尝试执行已优化的部分
这从而导致了完全不可预测的行为, 早于在我们到达最后一行的 unsafe 块之前
就像这样, 未定义行为可能以一种非常意想不到的方式, 向后和向前传播与污染整个程序

调用任何 unsafe 函数前, 请仔细阅读文档, 并确保完全了解其安全要求
作为调用者, 您需要秉承 "避免未定义行为" 的准则


内部可变性

上一节中介绍的借用规则很简单, 但可能非常有限, 尤其是在涉及多个线程时
遵循这些规则, 将使得线程间的通信被严重限制, 甚至会变得几乎不可能, 因为多线程间可访问的数据不能突变
幸运的是, 有个逃生舱口: 内部可变性(interior mutability)
具有内部可变性的数据类型, 会稍微改变其借用规则, 在某些情况下, 这些类型可通过 "不可变引用" 而允许突变

在先前讲述引用计数时, 我们已看到了个涉及内部可变性的微妙例子:
RcArc 被 clone 与 drop 时, 会改变引用计数器的数值, 即使多个克隆都使用着相同的引用计数器

一旦涉及内部可变类型, 将引用称为 "不可变" 或 "可变" 会令人困惑和不准确, 因为两者都能够突变数据
更准确的术语是 "共享" 和 "独占":
共享引用(&T)可以复制并与他人共享, 而独占引用(&mut T)保证了它是该 T 的唯一独占借用
对于大多数类型, 共享引用不允许突变, 但也有例外
由于在本系列中, 我们将主要处理这些例外, 因此我们将在本系列的其余部分使用更准确的术语

注意:
请记住, 内部可变性只会改变共享借用的规则, 以允许共享时的突变, 而不会改变任何关于独占借用的行为
独占借用仍然保证没有其他独享的有效借用(active borrow)
unsafe 中对某物的多个有效的独占引用, 将始终导致未定义行为, 不会考虑内部可变性

Cell

std::cell::Cell<T> 只是个包裹了 T 类型数据的类型, 但允许通过共享引用进行突变
为避免未定义行为, 它只允许您复制值(T 必须实现 Copy), 或用另一个值替换已被包裹的值
此外, 它只能在单个线程中使用

让我们看个类似上一节中的示例, 但这次使用 Cell<i32> 而不是 i32:

use std::cell::Cell;

fn f(a: &Cell<i32>, b: &Cell<i32>) {
    let before = a.get();
    b.set(b.get() + 1);
    let after = a.get();
    if before != after {
        x(); // might happen
    }
}

与上次不同, 现在 if 条件可能为真, 因为 Cell<i32> 具有内部可变性, 所以只要我们有对它的共享引用, 编译器就不能再假设它的值不会改变
a 和 b 可能都引用相同的值,因此通过 b 对数据突变, 也可能影响到 a
但是, 它仍假定没有其他线程同时访问 Cell 类型的 a 与 b

对 Cell 的限制并不总是那么容易处理, 因为它不能直接让我们借用它所持有的值
我们要将其值移出, 修改它后再放回去, 以改变它的内容:

fn f(v: &Cell<Vec<i32>>) {
    let mut v2 = v.take(); // Replaces the contents of the Cell with an empty Vec
    v2.push(1);
    v.set(v2); // Put the modified Vec back
}

RefCell

与常规 Cell 不同, std::cell::RefCell 允许您以较小的运行时成本借用其值
RefCell<T> 不仅持有 T, 还持有引用计数器, 用于在运行时, 跟踪对被包裹数据的借用的情况
当你试图以不可变地借用它, 而它已经被可变借用了(反之亦然), 它会 panic, 从而避免未定义的行为
Cell 一样, RefCell 只能在单个线程中使用

借用 RefCell 包裹的值, 是通过调用 borrowborrow_mut 来完成的:

use std::cell::RefCell;

fn f(v: &RefCell<Vec<i32>>) {
    v.borrow_mut().push(1); // We can modify the `Vec` directly.
}

译者注:
Cell/RefCell, 简单来说, 属于在编译时欺骗了编译器, 将借用规则从编译器挪动到运行期, 以便于放宽要求
但借用规则还是满足的, 只是晚了一个阶段而已, 如果违背了借用规则, 依旧会报错

虽然 CellRefCell 非常有用, 但当我们处于多线程并发的环境时, 它们变得没啥用
因此, 让我们转到与并发相关的类型

Mutex/RwLock

RwLock (reader-writer lock), 是 RefCell 的并发版本
一个 RwLock<T>, 持有 T 并跟踪借用的情况
但与 RefCell 不同的是, 它不会在借用冲突时 panic, 而是阻塞当前线程(使其进入睡眠状态), 同时等待冲突的借用消失
我们只需耐心地等待其他线程完成, 然后处理数据即可

借用被 RwLock 包裹的值, 称为锁定(lcok), 通过锁定它, 我们可以暂时阻止并发的冲突借用, 以允许我们借用它而不会引起数据竞争
Mutex 非常相似, 但在概念上稍微简单些, 它不像 RwLock 那样跟踪共享借用和独占借用的数量, 而只允许独占借用

我们将在本节之后, 更加详细地介绍这些类型

Atomic

Atomic(原子), 该类型是 Cell 的并发版本, 是之后第 2 章和第 3 章的主题
Cell 一样, 它们通过将值作为一个整体, 复制进来和传出来, 以避免未定义行为, 而不让我们直接借用内容

但与 Cell 不同, 它们不能是任意大小
因此没有通用的 Atomic<T> 类型, 而只有特定的原子类型, 例如 AtomicU32AtomicPtr<T>
哪些可用取决于平台, 因为它们需要处理器的支持以避免数据竞争(我们将在之后的第 7 章中深入探讨)

由于它们的大小非常有限, 因此原子通常不直接包含需要在线程之间共享的信息
相反, 它们通常被用作一种工具, 以便在线程之间共享其他(通常是更大的)数据, 此时事情可能会变得非常复杂

UnsafeCell

UnsafeCell 是内部可变性的原始的构建块
UnsafeCell<T> 包装 T, 但不附带任何条件或限制以避免未定义行为
相反, 它的 get() 方法只是给出一个包装值的原始指针, 该值只能在 unsafe 块中使用
它让用户以不会导致任何未定义行为的方式使用它

最常见的是, UnsafeCell 不直接使用, 而是包装在另一类型中, 该类型通过有限的接口(例如 CellMutex)提供安全性
所有具有内部可变性的类型(包括上面讨论的所有类型)都建立在 UnsafeCell 之上


线程安全

在本章中, 我们看到了几种非线程安全的类型, 这些类型只能在单个线程上使用, 例如 Rc. Cell
由于需要限制以避免未定义行为, 因此编译器需要去理解并检查, 以便于知晓使用这些类型时无需使用 unsafe 块

Rust 语言使用两个特殊的 Trait 来跟踪哪些类型可以跨线程安全使用:

  • Send:
    如果一个类型可以被发送到另一个线程, 则该类型为 Send, 即该类型的值的所有权, 可以转移到另一个线程
    例如, Arc<i32> 实现了 Send, 但 Rc<i32> 没有

  • Sync:
    如果一个类型可以与其他线程共享, 则类型为 Sync, 即当且仅当 &T 是 Send 时, T 实现了 Sync
    例如, i32 是 Sync, 因为 &i32 是 Send, 但 Cell<i32> 不是 (不过 Cell<i32> 是 Send)

所有基础类型, 如 i32, bool, str, 都是 Send 和 Sync

这两个 Trait 都是 auto 的, 这意味着它们会根据类型的字段而自动实现
比如, 当字段均为 Send 和 Sync 时, 该 struct 本身也是 Send 和 Sync

选择主动避免实现其中任何一个 Trait 的方法, 是向类型添加一个未实现相关 Trait 的字段
为此, 特殊的 std::marker::PhantomData<T> 类型通常会派上用场, 编译器将该类型视为 T. 但它在运行时实际上并不存在
这是一个零大小的类型, 不占用空间

我们来看看下面的 struct:

use std::marker::PhantomData;

struct X {
    handle: i32,
    _not_sync: PhantomData<Cell<()>>,
}

在此示例中, 若 handle 是其唯一字段, 则 X 将是 Send 和 Sync
但是, 我们添加了一个大小为零的 PhantomData<Cell<()>> 字段, 该字段被视为 Cell<()>
因为 Cell<()> 不是 Sync, 所以 X 也不是, 但它仍然是 Send, 因为它的所有字段都实现了 Send

原始指针 (*const T 和 *mut T) 既不是 Send 也不是 Sync, 因为编译器对它们表示的数据知之甚少

选择手动实现其中任何一个 Trait , 都需要用 unsafe 进行标明:

struct X {
    p: *mut i32,
}

unsafe impl Send for X {}
unsafe impl Sync for X {}

实现这些特征之所以需要 unsafe 关键字, 是因为编译器无法为您检查它是否正确, 这是你对编译器的承诺, 它只需信任你

如果您尝试将某些内容移动到另一个非 Send 的线程中, 编译器会礼貌地阻止您这样做, 下面是个小示例:

fn main() {
    let a = Rc::new(123);
    thread::spawn(move || { // Error!
        dbg!(a);
    });
}

在这里, 我们尝试将 Rc<i32> 发送到新线程, 但 Rc<i32>Arc<i32> 不同, 它没实现 Send
如果我们尝试编译上面的示例,我们将面临如下所示的错误:

error[E0277]: `Rc<i32>` cannot be sent between threads safely
   --> src/main.rs:3:5
    |
3   |     thread::spawn(move || {
    |     ^^^^^^^^^^^^^ `Rc<i32>` cannot be sent between threads safely
    |
    = help: within `[closure]`, the trait `Send` is not implemented for `Rc<i32>`
note: required because it's used within this closure
   --> src/main.rs:3:19
    |
3   |     thread::spawn(move || {
    |                   ^^^^^^^
note: required by a bound in `spawn`

thread::spawn 函数要求其参数为 Send, 如果闭包的所有捕获都为 Send, 则闭包也为 Send
如果我们试图捕捉不是 Send 的东西, 那么我们的错误就会被抓住, 编译器会保护我们免受未定义行为的影响


互斥锁

在线程间共享不可变或可变的数据, 最常用的工具是 Mutex, 它是 互斥(mutual exclusion) 的缩写
互斥锁的工作是, 通过暂时阻塞尝试同时访问某些数据的其他线程, 来确保线程对某些数据具有独占访问权限

从概念上讲, 互斥锁只有两种状态: 锁定和解锁(lock and unlock)

当线程锁定了未锁定的互斥锁时, 互斥锁将标记为锁定, 线程可以立即继续
当线程尝试锁定已锁定的互斥锁时, 该操作将阻塞, 线程在等待互斥锁解锁时进入睡眠状态

解锁只能在锁定的互斥锁上进行, 并且应该由锁定它的同一线程完成
如果其他线程正在等待锁定互斥锁, 则解锁互斥锁将导致其中一个线程被唤醒, 被唤醒的线程可以尝试再次锁定互斥锁
该过程可以一直重复

使用互斥锁保护数据, 只是所有线程间的协议, 即它们仅在锁定互斥锁时访问数据
这样, 没有两个线程可以同时访问该数据, 避免了数据竞争

Rust's Mutex

Rust 标准库通过 std::sync::Mutex<T> 提供此功能
类型 T 是互斥锁保护的数据类型, 通过将 T 类型的数据作为互斥锁的一部分, 数据将只能通过互斥锁访问
这允许了一个安全的接口, 可以保证所有线程都将遵守协议

为了确保锁定的互斥锁只能由锁定它的线程解锁, 它没有 unlock() 方法
相反, 它的 lock() 方法返回一个名为 MutexGuard 的特殊类型, 表示我们已锁定互斥锁的保证
它的行为类似于一个通过 DerefMut 获取的独占引用, 使我们能独占地访问互斥锁保护的数据
解锁互斥锁是通过 drop 这个 MutexGuard 类型的变量来完成的, 当我们放下 guard 时, 我们放弃了访问数据的能力, guard 的 Drop 实现将解锁互斥锁

让我们看一个例子, 看看互斥锁在实践中的运用:

use std::sync::Mutex;

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

在这里, 我们有一个 Mutex<i32>, 一个保护整数的互斥锁, 我们生成十个线程, 每个线程将整数递增一百次
每个线程将首先锁定互斥锁, 以获取 MutexGuard, 然后使用该 guard 访问整数并对其进行修改
当 guard 超出作用域后, 其将立即被隐式地 drop

线程完成后, 我们可以安全地从通过 into_inner() 方法以删除互斥保护
into_inner 方法获取了互斥锁的所有权, 这保证了没有其他任何东西可以再引用互斥锁, 因此不需要锁定

即使增量以 1 为步长发生, 但观察整数的线程也只能看到 100 的倍数, 因为它只能在互斥锁解锁时查看整数
实际上, 由于互斥锁, 一百个增量现在是一个不可分割的操作, 这样不可分割的操作也称为原子操作

为了清楚地看到互斥锁的效果, 我们可以让每个线程在解锁互斥锁之前等待一秒钟:

use std::time::Duration;

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
                thread::sleep(Duration::from_secs(1)); // New!
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

当您现在运行该程序时, 您将看到大约需要 10 秒才能完成, 每个线程只等待一秒钟, 但互斥锁会确保一次只有一个线程可以这样做

如果我们在 sleep 前, 就 drop 掉 guard, 从而解锁互斥锁, 我们将看到它并行发生:

fn main() {
    let n = Mutex::new(0);
    thread::scope(|s| {
        for _ in 0..10 {
            s.spawn(|| {
                let mut guard = n.lock().unwrap();
                for _ in 0..100 {
                    *guard += 1;
                }
                drop(guard); // New: drop the guard before sleeping!
                thread::sleep(Duration::from_secs(1));
            });
        }
    });
    assert_eq!(n.into_inner().unwrap(), 1000);
}

通过此更改, 该程序只需大约一秒钟, 因为现在 10 个线程可以同时执行一秒钟的睡眠, 这表明尽可能缩短互斥锁锁定时间的重要性
当互斥锁锁定的时间超过必要的时间, 可能会完全抵消并行性的任何好处, 从而有效地强制所有内容按顺序发生

锁中毒

上面示例中的 unwrap() 调用与锁中毒(Lock Poisoning)有关

当线程在 lock 锁时发生 panic, Mutex 会被标记为中毒(Poisoned)
发生这种情况时, Mutex 将不再被锁定, 但调用其 lock 方法将导致得到一个 Err, 表示它已经中毒

这是一种 "防止受互斥锁保护的数据处于不一致状态" 的机制
在上面的示例中, 若线程在将整数递增不到 100 次后出现 panic, 则互斥锁将解锁
此时整数将处于意外状态, 不再是 100 的倍数, 这可能会破坏其他线程的假设
在这种情况下, 自动将互斥锁标记为中毒, 会强制用户处理这种可能性

在中毒的互斥锁上调用 lock() 仍会锁定互斥锁
lock() 返回的 Err 包含 MutexGuard, 允许我们在必要时更正不一致的状态

虽然锁中毒似乎是种强大的机制, 但在实践中并不经常从潜在的不一致状态中恢复
大多数代码要么忽略锁中毒, 要么在锁中毒时使用 unwrap() 进行 panic, 从而有效地将 panic 传播给互斥锁的所有用户

Lifetime of the MutexGuard:
虽然隐式地 drop 掉 guard 变量, 可以方便地解锁互斥锁, 但有时会导致微妙的意外
如果我们用 let 语句给 guard 分配一个名称(如上面的例子), 将会很直接地知晓它何时 drop, 因为局部变量在其作用域的末尾会自动被 drop
尽管如此, 不显式 drop 掉 guard, 可能会导致将互斥锁锁定的时间不必要的延长, 如上面的示例所示

使用无名称的 guard 也是可能的, 有时甚至非常方便
由于 MutexGuard 的行为, 类似于对受保护数据的独占引用, 因此我们可以直接使用它, 而无需先为 guard 分配名称
例如, 如果您有 Mutex<Vec<i32>>, 则可在单个语句中, 锁定互斥锁, 进行 push, 最后再次解锁互斥锁(drop):

list.lock().unwrap().push(1);

这是因为, 在表达式中生成的任何临时变量(例如通过 lock() 返回的守卫)都将在语句末尾删除
虽然这看起来是显而易见且合理的, 但它会导致一个常见的陷阱, 通常涉及 match, if let, while let 语句
下面是遇到此陷阱的示例:

if let Some(item) = list.lock().unwrap().pop() {
    process_item(item);
}

这段代码的意图, 是锁定 list, 随后 pop 一个元素并解锁 list, 最后处理该元素, 但我们在这犯了个微妙但重要的错误
临时生成 guard, 将会直到整个 if let 语句结束才会 drop, 这意味着我们在处理项目时, 保持了不必要的锁定

令人惊讶的是, 对于类似的 if 语句, 这种情况不会发生:

if list.lock().unwrap().pop() == Some(1) {
    do_something();
}

在这里, 不像先前的 if let 语句, 临时的 guard 确实在执行 if 语句的主体前就被 drop 了
原因是: 常规 if 语句的条件始终是普通 bool 值, 不会借用任何东西
因此, 没有理由将临时变量的 lifetime 从条件延长到语句结束

但是, 对于 if let 语句, 情况可能并非如此
例如, 如果我们使用 front() 而不是 pop(), 则 item 将从列表中借用, 因此有必要保持警惕

因为使用 if let 语句时可能会产生借用
于是在此情况下, 被创建的临时变量的生命周期, 被规定将会被延长到 if let 的主体结束
因此就算当我们使用 pop() 而非 front() 时, 也会发生同样的情况, 即使这不是必需的
故 guard 不会在第一时间被 drop 掉

我们可以通过将 pop 操作移动到单独的 let 语句, 来避免这种情况
因此, 在该语句的末尾, 在 if let 之前, 被临时创建的 guard 将会被 drop:

let item = list.lock().unwrap().pop();
if let Some(item) = item {
    process_item(item);
}

RwLock

互斥锁只涉及独占访问, MutexGuard 将为我们提供对受保护数据的独占引用(&mut T)
即使我们只想查看数据, 并使用已经足够满足需求的共享引用(&T)

读写锁(RwLock)是互斥锁的一个稍微复杂点的版本, 它了解独占访问和共享访问之间的区别, 并且可以提供两者之一
它有三种状态: 解锁, 由单个写入器(writer)锁定(用于独占访问), 被任意数量的读取器(reader)锁定(用于共享访问)
它通常用于经常由多个线程读取但仅偶尔更新一次的数据

Rust 标准库通过 std::sync::RwLock<T> 类型提供此锁
它的工作方式与 Mutex 类似, 只是它的接口主要分为两部分
它不是单个 lock() 方法, 而是具有用于锁定为 writer 或 reader 的 read()write() 方法
它有两种 guard 类型, 一种用于 reader, 一种用于writer: RwLockReadGuardRwLockWriteGuard
两者都实现了 Deref 以表现为对受保护数据的引用, 前者的表现类似于共享引用, 而后者的表现类似于独占引用

它实际上是 RefCell 的多线程版本, 动态跟踪引用的数量, 以确保遵守借用规则

Mutex<T>RwLock<T> 都要求 T 为 Send, 因为它们可用于将 T 发送到另一个线程
RwLock<T> 还需要 T 来实现 Sync, 因为它允许多个线程保存对受保护数据的共享引用(&T)
严格来说, 您可以为不满足这些要求的 T 创建一个锁, 但您将无法在线程之间共享它, 因为锁本身不会实现 Sync

Rust 标准库只提供一种通用的 RwLock 类型, 但其实现取决于操作系统, 其实现之间存在许多细微差异
当有 writer 等待时, 大多数实现都会选择阻塞新的 reader, 即使锁已经 read-locked 也是如此
这是为了防止 writer starvation(写饿死/写入器匮乏), 即一种 "readers 过多导致锁一直无法被解锁, 导致 writer 无法更新数据" 的情况
(毕竟当一个 writer 等待时, 如果一直有新的 reader 加入, 则 writer 可能将因此永远没有机会写入了)

Mutexes in Other Languages:
Rust 标准库中的 MutexRwLock 类型, 看起来与 C 或 C++ 等其他语言中的实现略有不同
最大的区别是 Rust 的 Mutex<T> 包含它正在保护的数据

例如, 在 C++ 中, std::mutex 不包含它保护的数据, 甚至不知道它在保护什么
这意味着, 用户有责任记住哪些数据受到保护, 以及由哪个互斥锁保护, 并确保每次访问 "受保护数据" 时, 都锁定正确的互斥锁
在阅读涉及其他语言互斥体的代码时, 或者与不熟悉 Rust 的程序员交流时, 记住这一点很有用
一个 Rust 程序员可能会谈论 "互斥锁中的数据", 或者 "将其包装在互斥锁中" 之类的话, 这可能会让那些只熟悉其他语言互斥锁的人感到困惑

如果您确实需要一个不包含任何数据的独立互斥锁, 例如保护某些外部硬件, 则可使用 Mutex<()>
但即使在这种情况下, 您最好定义一个(可能是零大小的)类型来与该硬件接口, 并将其包装在 Mutex
这样, 您仍然被迫锁定互斥锁, 然后才能与硬件交互


线程等待

当数据被多个线程改变时, 它们可能需要进行等待, 等到某些条件变为真再继续执行
例如, 如果我们有一个保护 Vec 的互斥锁, 我们可能需要等到它非空(被放入东西后再继续)

互斥锁确实允许线程等待它被解锁, 但它不提供这种 "等待到某个条件成立" 的功能
如果我们只有一个互斥锁, 我们将不得不继续锁定互斥锁, 以反复检查 Vec 中是否有任何东西

Park

等待来自另一个线程的通知的一种方法, 称为 thread parking(线程停放)
线程可以自行 park, 使其进入睡眠状态, 从而防止 CPU 空耗, 然后,另一个线程可以将线程唤醒/解除停放(unpark), 使其继续

译者注:
park 的意思就是把车停放下来, 对应让线程进入睡眠
unpark 的意思就是把车开走, 对应让线程继续

park 的行为, 可通过 std::thread::park() 函数表示
unpark 的行为, 您可以在要被唤醒的线程的 Thread 类型的对象上, 调用 unpark() 方法表示
Thread 对象可以从 spawn 返回的 JoinHandle 获取, 也可以通过 std::thread::current() 从线程本身获取

让我们深入了解一个使用互斥锁, 在两个线程之间共享队列的示例
在以下示例中, 新生成的线程将消耗队列中的元素(consumer), 而主线程将每秒将一个新元素插入到队列(producter)
park 被用于让 comsumer 线程在队列为空时进行等待

use std::collections::VecDeque;

fn main() {
    let queue = Mutex::new(VecDeque::new());

    thread::scope(|s| {
        // Consuming thread
        let t = s.spawn(|| loop {
            let item = queue.lock().unwrap().pop_front();
            if let Some(item) = item {
                dbg!(item);
            } else {
                thread::park();
            }
        });

        // Producing thread
        for i in 0.. {
            queue.lock().unwrap().push_back(i);
            t.thread().unpark();
            thread::sleep(Duration::from_secs(1));
        }
    });
}

这是一个无限循环进行的例子, 在该循环中, 当元素不为空时, 它将元素从队列中弹出, 使用 dbg! 显示它们
当队列为空时, 它将调用 park(), 使线程进入睡眠状态, 防止 CPU 空耗
如果它被 unpark, 则先前调用的 park() 方法将会 return, 随后循环继续, 再次从队列中弹出元素, 直到它为空, 重复

consumer 线程每秒生成一个新数字, 然后将其 push 到队列中
每次添加时, 都会在代表着 consumer 线程的 Thread 对象上, 调用 unpark() 方法来 unpark 它
这样, consumer 线程就会被唤醒, 以处理新的元素

这里要提出的一个重要观察是, 如果我们删除对 park() 的调用, 这个程序在理论上仍然正确, 但效率低下
这非常重要, 因为 park() 不能保证它会只因为相匹配的 unpark() 而返回
虽然罕见, 但它可能会被虚假地唤醒(spurious wake-ups)

我们的示例很好地处理了这个问题, 因为 consumer 线程将锁定保护着队列的互斥锁, producter 线程将因此阻塞
随后, consumer 线程将检测队列是否为空, 当队列为空时, 就再次调用 park() 让自己进入睡眠状态

Thread Parking 的一个重要特性是, 对 unpark() 的调用, 在线程 park 自身之前, 不会丢失
对 unpark 的请求仍然会被记录, 下次线程尝试 park 时, 它会认识到该请求, 随后清除该请求后直接继续, 而不会实际进入睡眠
为了了解为何这对正确性至关重要, 让我们来看看两个线程的执行步骤的可能顺序(从一开始观察):

  1. consumer 线程(简写为C)锁定了队列
  2. C 尝试从队列中 pop 一个元素, 但它是空的, 表达式为 None
  3. C 随后解锁了队列
  4. producter 线程(简写为P)锁定了队列
  5. P 将新元素 push 到队列中
  6. P 再次解锁了队列
  7. P 调用 unpark() 通知 C 有新元素
  8. C 调用 park() 进入睡眠, 等待更多元素

首先假设该特性暂时不存在, 看看会发生什么:
在步骤 3 中的解锁队列, 到步骤 8 中的 unpark 间, 可能只有非常短暂的时间, 但步骤 4 ~ 7 可能会在线程 park 之前的那一刻发生
如果 unpark 发生在线程未 park 时, 则它不执行任何操作, 通知将会丢失, C 线程仍将等待, 即使队列中有元素也是如此
多谢了 unpark requests 将会被保存, 以供将来调用 park(), 我们将不必担心

但是, unpark requests 不会叠加
调用 unpark() 两次,然后调用 park() 两次, 仍会导致线程进入 sleep 状态
第一个 park() 清除请求, 并直接返回, 但第二个像往常一样使线程进入 sleep

这意味着, 在上面的示例中重要的是, 只有当队列为空时才 park, 而非在获取 item 之后 park 线程
虽然睡眠时间巨大(一秒), 此示例中极不可能发生, 但多个 unpark() 调用, 可能只唤醒一个 park() 调用

这意味着, 当在 park() 返回之后, 但在队列被锁定和清空之前, 期间调用 unpark() 是不必要的, 会导致下个 park() 立即 return
这会导致(空)队列被 lock/unlock, 消耗更多时间, 虽然这不会影响正确性, 但确实会影响效率与性能

此机制适用于像我们示例中这样的简单情况, 但当事情变得更加复杂时, 一切很快就会崩溃
例如, 当有多个 consumer 从同一队列中获取元素, 则 producter 将无法知道哪个 consumer 实际上正在等待, 且应该被唤醒
producter 必须确切地知道 consumer 何时在等待, 以及它正在等待什么条件

Condvar

Condvar(条件变量)是更常用的选项, 用于等待某些事情发生在受互斥锁保护的数据身上

它有两个基本操作: wait(等待) 与 notify(通知)
thread 可以 wait 一个 Condvar, 之后当另一个 thread 对该相同的 Condvar 进行 notify 时, 等待的 thread 将被唤醒
多个 Thread 可以 wait 同一个 Condvar, notifications(通知)能被发送到另一个 waiting-thread, 也可以发送到所有 threads

这意味着, 我们可以为特定事件/条件, 创建一个 Condvar, 例如队列为非空, 并等待该条件
然后, 导致该事件/条件发生的任何线程, 都会通知 Condvar, 而不必知道哪些线程, 或有多少线程对该通知感兴趣

为了避免在 "解锁互斥锁和等待条件变量之间的短暂时刻内" 丢失通知的问题, 条件变量提供了一种以原子方式解锁互斥锁并开始等待的方法
这意味着通知根本不可能丢失

Rust 标准库提供了一个条件变量的类型, 即 std::sync::Condvar, 它的 wait 方法需要 MutexGuard, 以证明我们已经锁定了互斥锁
wait 将首先解锁互斥锁, 并进入睡眠状态, 稍后当唤醒时, 它会重新锁定互斥锁并返回新的 MutexGuard (证明互斥锁再次被锁定)

它有两个通知函数: notify_one 与 notify_all
前者只唤醒一个等待线程(如果有), notify_all 则唤醒所有

让我们修改先前使用 Parking 的示例, 改用 Condvar:

use std::sync::Condvar;

let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();

thread::scope(|s| {
    s.spawn(|| {
        loop {
            let mut q = queue.lock().unwrap();
            let item = loop {
                if let Some(item) = q.pop_front() {
                    break item;
                } else {
                    q = not_empty.wait(q).unwrap();
                }
            };
            drop(q);
            dbg!(item);
        }
    });

    for i in 0.. {
        queue.lock().unwrap().push_back(i);
        not_empty.notify_one();
        thread::sleep(Duration::from_secs(1));
    }
});

你会看见, 代码中对 Condvar 的合理命名, 代表了 "队列非空" 这一条件

我们首先得进行一些更改:

  • 现在不仅有一个包含队列的 Mutex, 还有一个 Condvar 来表达 "非空" 这一条件
  • 我们不再需要知道要唤醒哪个线程, 因此我们不再存储从 spawn 返回的线程句柄
    相反, 我们通过条件变量的 notify_one 方法, 通知了 consumer thread, 使其被唤醒
  • unlock, wait, 与再一次的 unlock, 都由 wait 方法完成
    我们必须稍微重构控制流, 以便能够将 guard 传递给 wait 方法, 同时在处理元素前, 仍将 guard 给 drop 掉

下面是先前的代码, 你可以进行对比:

use std::collections::VecDeque;

fn main() {
    let queue = Mutex::new(VecDeque::new());

    thread::scope(|s| {
        // Consuming thread
        let t = s.spawn(|| loop {
            let item = queue.lock().unwrap().pop_front();
            if let Some(item) = item {
                dbg!(item);
            } else {
                thread::park();
            }
        });

        // Producing thread
        for i in 0.. {
            queue.lock().unwrap().push_back(i);
            t.thread().unpark();
            thread::sleep(Duration::from_secs(1));
        }
    });
}

现在, 我们可以根据需要, 生成任意数量的 consumer thread, 而无需更改任何内容
条件变量负责将通知传递到你感兴趣的线程

如果我们有一个更复杂的系统, 其中线程对不同的条件感兴趣, 我们可以为每个条件定义一个 Condvar
例如, 我们可以定义一个来指示队列为非空, 另一个来指示它是空的, 然后每个线程将等待与它们正在做的事相关的任何条件

通常, Condvar 只能与单个 Mutex 一起使用
如果两个线程尝试使用两个不同的互斥锁, 在条件变量上并发 wait, 则可能会导致崩溃

Condvar 的缺点是它只有在与 Mutex 一起使用时才有效, 但对于大多数用例来说, 这完全没问题, 因为这正是用于保护数据的方法

thread::park()Condvar::wait() 都有一个有时间限制的变体: thread::park_timeout()Condvar::wait_timeout()
这些将 Duration 作为额外参数, 它将在指定时间之后放弃等待通知并无条件唤醒


总结

  • 多个线程可以在同一程序中同时运行, 并且可以随时生成
  • 当主线程结束时, 整个程序结束
  • 数据竞争是未定义行为(ub), Rust 的类型系统完全防止了这种行为(在 safe-rust 中)
  • 实现了 Send 的数据可以发送到其他线程, 实现了 Sync 的数据可以在线程间共享
  • 常规线程可能会与程序运行相同时长, 因此只能借用 lifetime 为 'static 的变量, 例如静态和泄漏的数据
  • 引用计数(Arc)可用于共享所有权, 以确保只要至少有一个线程正在使用数据, 数据就一直存在
  • 作用域线程可用于限制线程的 lifetime, 以允许借用非 'static 的数据, 例如局部变量
  • &T 是共享引用, &mut T 为独占引用, 常规类型不允许其共享引用的突变
  • 多谢了 UnsafeCell, 某些类型具有内部可变性, 这允许通过共享引用进行突变
  • Cell 和 RefCell 是单线程内部可变性的标准类型, Atomic, Mutex, RwLock 是它们的多线程等价物
  • Cell 和 Atomic 只允许将值作为一个整体替换, 而 RefCell, Mutex, RwLock 允许您通过动态实行访问规则, 来直接改变值
  • Park 可能是等待某些条件的便捷方式
  • 当条件是关于受 Mutex 保护的数据时, 使用 Condvar 比 Park 更方便, 效率更高

(翻译得累死我了, 玩明日方舟去了, 新活动出的提丰好可爱啊啊啊啊啊啊啊啊)


上一篇: p1~> 系列说明
下一篇: p3~> 原子