rust-atomics-and-locks-p3~> 原子
<~~ 发表日期:2023-07-06 | 访问量:  | 本文词数:1488 | 预计阅读时间:8分钟 ~~>

原子一词源自希腊语 ἄτομος, 意为不可分割的之物, 在计算机科学中被用于描述不可分割的操作: 要么完全完成, 要么没有发生

前言

如上一节中的 "借用与数据竞赛" 中所述, 多个线程同时读取和修改同一变量, 通常会导致未定义行为(ub)
但是, 原子操作确实允许不同的线程安全地读取和修改同一变量

由于此类操作是不可分割的, 因此它要么完全发生在另一个操作之前, 要么完全发生在另一个操作之后, 从而避免了未定义行为
在之后几节, 我们将看到这在硬件级别是如何工作的

原子操作是涉及多个线程的任何内容的主要构建块, 所有其他并发原语(如 Mutex 和 Condvar), 都是使用原子操作实现的
在 Rust 中,原子操作可以作为 std::sync::atomic 中的标准原子类型的方法使用
它们的名称都以 Atomic 开头, 例如 AtomicI32 或 AtomicUsize

哪些可用取决于硬件体系结构, 有时也取决于操作系统, 但几乎所有平台都至少提供 size 最大到指针的所有原子类型

与大多数类型不同, 它们允许通过共享引用进行修改(例如 &AtomicU8), 这要归功于内部可变性, 如上一节中的 "内部可变性" 中所述

每种可用的原子类型都具有相同的接口, 其中包含用于存储和加载的方法
比如 "fetch-and-modify" 操作, 以及一些更高级的 "compare-and-exchange"
我们将在本章的其余部分, 详细讨论它们

但是, 在我们深入研究不同的原子操作之前, 我们需要简要地触及一个称为 Memory-Order(内存顺序) 的概念:
每个原子操作都将 std::sync::atomic::Ordering 作为参数, 决定我们得到的关于操作的相对顺序的保证
保证最少的最简单变体是 Relaxed, 其仍然保证单个原子变量的一致性, 但不承诺不同变量之间的相对操作顺序

这意味着, 两个线程可能会看到对不同变量的操作以不同的顺序发生
例如, 如果一个线程先写入一个变量, 然后很快写入第二个变量, 则另一个线程可能会看到这种情况以相反的顺序发生

在本章中, 我们所研究的, 都是些上述问题不算问题的用例, 只需使用 Relaxed, 而无需详细了解细节
我们将在之后某节, 讨论 Memory-Order 的所有细节, 以及其他可用的 Ordering


Load/Store

我们将要看的两个原子操作是最基本的操作: load 和 store, 它们的函数签名如下, 以 AtomicI32 为例:

impl AtomicI32 {
    pub fn load(&self, ordering: Ordering) -> i32;
    pub fn store(&self, value: i32, ordering: Ordering);
}
  • load 方法以原子方式, 加载存储在原子变量中的值
  • store 方法以原子方式, 在其中存储新值

请注意 store 方法接收共享引用(&T)而非独占引用(&mut ), 即使它修改了值

让我们看一下这两种方法的一些实际用例:

例-停止标志

第一个示例, 使用 AtomicBool 表示 stop flag, 此类 flag 用于通知其他线程, 让它们停止某些手头上的工作

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;

fn main() {
    static STOP: AtomicBool = AtomicBool::new(false);

    // Spawn a thread to do the work.
    let background_thread = thread::spawn(|| {
        while !STOP.load(Relaxed) {
            some_work();
        }
    });

    // Use the main thread to listen for user input.
    for line in std::io::stdin().lines() {
        match line.unwrap().as_str() {
            "help" => println!("commands: help, stop"),
            "stop" => break,
            cmd => println!("unknown command: {cmd:?}"),
        }
    }

    // Inform the background thread it needs to stop.
    STOP.store(true, Relaxed);

    // Wait until the background thread finishes.
    background_thread.join().unwrap();
}

此示例中, 后台线程重复运行 some_work(), 而主线程允许用户输入一些文本作为命令, 以此与程序交互
在这个简单的示例中, 唯一有用的命令是 "stop" 以使程序停止

若要使后台线程停止, 使用 AtomicBool 作为 flag, 将此条件传达给后台线程
当前台线程读取 stop 命令时, 它将 flag 设置为 true, 后台线程在每次新迭代之前都会检查该标志
主线程等待后台线程, 使用 join 方法完成其当前迭代

只要后台线程定期检查 flag, 这个简单的解决方案就可以很好地工作
如果它长时间卡在 some_work() 中, 则可能导致 stop 命令和程序退出之间出现不可接受的延迟

例-进度报告

在这个示例中, 我们在后台线程上逐个处理 100 个 item, 而主线程会定期向用户提供有关进度的更新:

use std::sync::atomic::AtomicUsize;

fn main() {
    let num_done = AtomicUsize::new(0);

    thread::scope(|s| {
        // A background thread to process all 100 items.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Assuming this takes some time.
                num_done.store(i + 1, Relaxed);
            }
        });

        // The main thread shows status updates, every second.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::sleep(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

这一次, 我们使用了作用域线程, 它会自动为我们处理线程的join, 且允许我们借用局部变量
每次后台线程完成对 item 的处理时, 它都会将处理的item数量存在 AtomicUsize 中
同时, 主线程向用户显示该数字, 以通知他们进度, 大约每秒一次
一旦主线程看到所有 100 个 item 都已处理, 它就会退出 scope, 该 scope 隐式 join 了后台线程, 并通知用户一切都已完成

Synchronization(同步):
处理最后一项后, 主线程可能需要一整秒钟才能知道, 从而在末尾引入不必要的延迟
为解决该问题, 我们可以使用 Park(上一节中的内容), 在有主线程可能可能感兴趣的新消息时, 将主线程从 sleep 中唤醒

下面是相同的示例, 但使用 thread::park_timeout 替代了 thread::sleep:

fn main() {
    let num_done = AtomicUsize::new(0);

    let main_thread = thread::current();

    thread::scope(|s| {
        // A background thread to process all 100 items.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Assuming this takes some time.
                num_done.store(i + 1, Relaxed);
                main_thread.unpark(); // Wake up the main thread.
            }
        });

        // The main thread shows status updates.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::park_timeout(Duration::from_secs(1));
        }
    });

    println!("Done!");
}

变化不大, 我们通过 thread::current() 获得了主线程的句柄, 现在后台线程使用它在每次状态更新后, unpark 主线程
主线程现在使用 park_timeout 而不是 sleep, 因此可以中断

现在, 任何状态更新都会立即报告给用户, 同时仍然每秒重复一次更新, 以显示程序仍在运行

例-延迟初始化

在我们继续更高级的原子操作之前, 最后一个示例是关于 Lazy-Initialization(延迟初始化) 的