深入理解Rust的Atomic及Ordering

Rust
83
0
0
2024-02-29

之前提到的MutexCondvarRust中比较偏高层的共享数据型并发控制,更底层的并发控制也有,比如Atomic(原子操作)。

今天结合代码来深入聊聊Atomic及其相关的Ordering

文章目录

  • Mutex vs Atomic
  • Atomic 初探
  • 指令重排
  • Ordering
  • 验证 Ordering 的可见性
  • fence
  • 延迟加载

首先为什么要有 Atomic,用 Mutex 不就可以了吗,我们来对比下

Mutex vs Atomic

1.从数据操作上对比:

Mutex是并发下对数据的互斥访问控制,多个线程尝试写入,同时必须只能有一个线程争得锁,进而写入成功,其他线程只能等待其释放锁后再争夺锁。

Atomic本身就是并发下对数据的原子操作,其操作是构建在操作系统的原子指令上,比如读取(Load),写入(Store),比较写入(CAS,compare and swap),自增(fetch_add)等,操作要么成功要么失败,不可能被其他线程打断,出现中间状态,避免操作中数据竞争状态的发生。

也就是说Atomic是依赖底层系统的指令不可拆分达到无需锁(lock free)就能直接对数据地址共享操作。

2.从临界区构建上对比:

Mutex是在加锁和释放锁之间构建了并发访问的临界区,进而进行数据操作。

Atomic本身对于数据地址操作就是原子的,如果临界区想操作就是数据本身,那就不需要额外的保证

但如果还有别的数据需要在临界区操作,则需要通过load/store/cas等组合wait loop才能实现,也就是常说的自旋(spin),这也是更底层的方式。一般在对性能要求更细致场景会需要。

下边先用伪代码举例常见临界区的样子(后边会结合 Ordering 用代码详细展开)

thread 1:
    // 条件满足设置flag
    store/CAS flag: false->true
thread 2:
    // wait flag满足条件,模拟类似锁的阻塞, spin
    while load flag == false {};
    // 执行临界区操作
    ...

Atomic 初探

了解了Atomic的作用,下边先从一个例子了解下如何使用

use std::{
    sync::atomic::{AtomicBool, Ordering},
    thread,
};
// 初始化原子bool类型
static FLAG: AtomicBool = AtomicBool::new(false);

fn main() {
    let a = thread::spawn(|| {
        // 原子操作修改
        FLAG.store(true, Ordering::Relaxed);
    });
    let b = thread::spawn(|| {
        // 原子操作读取
        if FLAG.load(Ordering::Relaxed) {
            println!("Relaxed: Flag is set!");
        }
    });
    a.join().unwrap();
    b.join().unwrap();
}

代码很简单,两个线程分别对 Flag 修改和读取,读取时会尝试判断是否满足打印条件。

不过运行代码,打印不一定会发生。你可能觉得多线程下,两线程执行顺序不能保证,执行顺序可能是先 load 后 store,这样的结果也很正常。

这是一种可能,然而远没有那么简单。

这里要提下指令重排和AtomicOrdering排序

指令重排

fn f(a: &mut i32, b: &mut i32) {
    *a += 1;
    *b += 1;
    *a += 1;
}

当你写下这段代码,交给操作系统编译执行,但很可能你得到的是这样的

fn f(a: &mut i32, b: &mut i32) {
    *a += 2;
    *b += 1;
}

为什么?

操作系统处理器和编译器悄悄的帮你优化了代码来让他运行更快,这里规则是:

只要不影响程序语义,指令可以重排执行以优化,即不按代码顺序执行

单线程下这样问题可能还不大,但如果多线程下,同一线程下多条原子指令,也是会有指令重排的可能,数据竞争很有可能发生,就是说加了原子操作也无法确定数据操作顺序。

如以下代码:

b 线程的 1 和 2 不互相依赖,可以指令重排成 2 和 1

use std::{
    sync::atomic::{AtomicI32, Ordering},
    thread,
};

static X: AtomicI32 = AtomicI32::new(0);
static Y: AtomicI32 = AtomicI32::new(0);

fn main() {
    let a = thread::spawn(|| {
        let x = X.load(Ordering::Relaxed);
        Y.store(x, Ordering::Relaxed);
    });
    let b = thread::spawn(|| {
        // 1
        let y = Y.load(Ordering::Relaxed);
        // 2
        X.store(42, Ordering::Relaxed);
    });
    a.join().unwrap();
    b.join().unwrap();
    assert_eq!(X.load(Ordering::Relaxed), 42);
    // 有可能, b线程的1和2不互相依赖,可以指令重排成2和1
    assert_eq!(Y.load(Ordering::Relaxed), 42);
}

底层的原子操作当然不能坐视不理,提出了Ordering来约束当前原子操作修改在其他多线程下的可见性,希望能约束当前线程发生原子操作如何同步到其他线程,能在并发中并发数据操作能有更好的确定性

Ordering

Rust用于的内存访问顺序(memory order)的Ordering基本和`C++ 20`的内存排序[1]的保持一致, 下边先挨个过一遍

  • Relaxed

最基础的内存排序要求,只要求当前原子操作是要么完全执行,要么还未执行,其操作结果的可见性同步在其他线程没有任何顺序的保证(如指令重排代码所示)

  • Acquire

适用于读取数据操作,要求:

当前线程不能有其他的读或写被 reorder 在 load 之前其他线程的同一数据已发生的 Release 写入操作都是对其可见的。

  • Release

适用于写数据操作,要求:

当前线程不能有其他的读或写被 reorder 在 store 之后当前写入后的结果对其他线程的同一数据 Acquire 读取操作是可见的。

也就是说,这里线程间可见性要求,acquire load总是可以同步到其他线程已发生的release store

结合代码来看就是(指令重排部分见注释):

use std::{
    sync::atomic::{AtomicBool, Ordering},
    thread,
};
fn main() {
    // 更严谨的测试可以用loom
    for _ in 0..100000 {
        acquire_release()
    }
}

fn acquire_release() {
    static FLAG: AtomicBool = AtomicBool::new(false);
    static mut DATA: u64 = 0;
    let a = thread::spawn(|| {
        unsafe { DATA = 42 };
        FLAG.store(true, Ordering::Release);
        // 不允许有读写重排到store之后
    });
    let b = thread::spawn(|| {
        // 不允许有读写重排到load之前
        while !FLAG.load(Ordering::Acquire) {}
        assert_eq!(unsafe { DATA }, 42);
    });

    a.join().unwrap();
    b.join().unwrap();
}
  • AcqRel

适用于同时读写操作(Read and write),读操作用 Acquire,写操作用 Release

  • SeqCst

在保证读写一定是 Acquire 和 Release 的约束外,还保证其他线程看到的原子操作顺序一致,即全局只有一种内存结果可见顺序(a single total order)

也就是说多线程下,即使执行顺序不能保证,但执行完后全局只能有一种原子操作的结果顺序,可以每次是不一样的(因为执行的先后不同),但一旦执行顺序确定后,就不可能有第二种原子操作结果的可能性存在。如同将不同线程的原子操作执行给串行化了一样。

所以内存顺序的严格程度就是从Relax->Acquire+Relase->SeqCst。越严格当然也会带来越多的性能开销。

来个代码帮助理解下用Ordering组合构建临界区:

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
    let lock = Arc::new(AtomicBool::new(false));
    let lock_clone_read = lock.clone();
    let lock_clone_store = lock.clone();
    thread::spawn(move || {
        // 持有锁
        lock.store(true, Ordering::SeqCst);
        // 执行临界区操作
    });

    // 等待锁被获取
    while !lock_clone_read.load(Ordering::Acquire) {}

    // 进入临界区,可以放心的执行临界区操作了
    println!("Critical section!");

    // 释放锁
    lock_clone_store.store(false, Ordering::Release);
}

验证 Ordering 的可见性

原子操作结果可见性同步严格程度不同影响大么?

眼见为实,下边在通过一段Relaxed的代码先来验证下

use std::{
    sync::{
        atomic::{AtomicBool, AtomicU64, Ordering},
        Arc,
    },
    thread,
};

static S: AtomicU64 = AtomicU64::new(0);
fn relaxed() {
    let a = Arc::new(AtomicBool::new(false));
    let b = Arc::new(AtomicBool::new(false));
    let a_clone = a.clone();
    let b_clone = b.clone();

    let t1 = thread::spawn(move || {
        a.store(true, Ordering::Relaxed);
        if !b.load(Ordering::Relaxed) {
            S.fetch_add(1, Ordering::Relaxed);
        }
    });

    let t2 = thread::spawn(move || {
        b_clone.store(true, Ordering::Relaxed);
        if !a_clone.load(Ordering::Relaxed) {
            S.fetch_add(1, Ordering::Relaxed);
        }
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

fn main() {
    let cnt = 100000;
    for _ in 0..cnt {
        relaxed();
    }
    // 结果可能大于10000
    let s = S.load(Ordering::SeqCst);
    println!("s: {}", s);
}

结果是 S 基本大于 10000,为什么?我们展开分析下

两个AtomicBool A 和 B 初始是 false,两个线程用Relaxed先修改一个为 true 和再去读取另一个的值,如果判断读取到的值还是 false 就增加结果长度一次。

两个线程在判断是否增加结果长度时,会有以下几种可能:

  1. 其他线程指令重排
  • 修改排到 load 之后还没执行修改,S+1 (Relaxed 时)
  1. 其他线程指令不重排
  • 还没开始修改,S+1
  • 其他线程修改了,但对当前 load 不可见,S+1(Relaxed 时)
  • 其他线程修改了,对当前 load 可见,S 不变

所以指令重排和读取对修改不可见都会让 S+1, Relaxed会有更多可能让 S+1

所以遍历一次,两线程并发的情况下,如果 S 增加了 2,则说明对于修改的可见性同步要求较弱,即使另一个线程修改了值,也没能及时同步到当前值的 load

对于Relaxed约束,那么执行 100000 次,S 很容易大于 100000。

如果换成SeqCst,不允许上边代码中指令重排,又全局串行化了不同的原子操作。

如果其他线程修改发生在当前线程 load 之前,其一定是对当前线程 load 可见的,不会同时都不可见的可能性。所以 S 最多只能是 100000。

let t1 = thread::spawn(move || {
    a.store(true, Ordering::SeqCst);
    if !b.load(Ordering::SeqCst) {
        S.fetch_add(1, Ordering::Relaxed);
    }
});

let t2 = thread::spawn(move || {
    b_clone.store(true, Ordering::SeqCst);
    if !a_clone.load(Ordering::SeqCst) {
        S.fetch_add(1, Ordering::Relaxed);
    }
});

fence

Ordering除了可以对绑定到单个原子数据类型的操作上,也可以用在fence约束多条原子操作上,防止编译器和处理器对内存操作的重排,添加内存屏障(memory barrier),这也是构建临界区的一种方式

a.store(1, Release);
// 可以替换为
fence(Release);
a.store(1, Relaxed);
a.load(Acquire);
// 可以被替换为
a.load(Relaxed);
fence(Acquire);

这样拆分后,可以被扩展用作多个数据操作组合在线程间可见性的保证。也可以可选的选择什么时候用acquire/release/seqcst

比如下边用原子操作和 fence 模拟锁的实现

use std::sync::atomic::AtomicBool;
use std::sync::atomic::fence;
use std::sync::atomic::Ordering;

pub struct Mutex {
    flag: AtomicBool,
}

impl Mutex {
    pub fn new() -> Mutex {
        Mutex {
            flag: AtomicBool::new(false),
        }
    }

    pub fn lock(&self) {
        // relaxed+acquire fence 来及时同步已发生的unlock (release)
        // cas时没有直接使用严格的acquire
        while self
            .flag
            .compare_exchange_weak(false, true, Ordering::Relaxed, Ordering::Relaxed)
            .is_err()
        {}
        fence(Ordering::Acquire);
    }

    pub fn unlock(&self) {
        self.flag.store(false, Ordering::Release);
    }
}

延迟加载

Atomic也常用来做资源的延迟初始化,让多线程共享一份资源。

比如:

use std::{
    sync::atomic::{AtomicPtr, Ordering},
    thread,
};

struct Data {}
fn generate_data() -> Data {
    Data {}
}
fn get_data() -> &'static Data {
    static PTR: AtomicPtr<Data> = AtomicPtr::new(std::ptr::null_mut());

    let mut p = PTR.load(Ordering::Acquire);

    if p.is_null() {
        p = Box::into_raw(Box::new(generate_data()));
        if let Err(e) = PTR.compare_exchange(
            std::ptr::null_mut(),
            p,
            Ordering::Release,
            Ordering::Acquire,
        ) {
            // Safety: p comes from Box::into_raw right above,
            // and wasn't shared with any other thread.
            drop(unsafe { Box::from_raw(p) });
            p = e;
        }
    }

    // Safety: p is not null and points to a properly initialized value.
    unsafe { &*p }
}
fn main() {
    let t1 = thread::spawn(|| get_data());
    let t2 = thread::spawn(|| get_data());
    let (ret1, ret2) = (t1.join().unwrap(), t2.join().unwrap());
    assert_eq!(ret1 as *const _, ret2 as *const _);
}

不过一般情况下我们都没有需要自己去实现,很多 crate 都能实现类似操作,比如OneCell[2]

综上,Atomic是更底层的原子操作,为了同步原子操作的结果在其他线程的可见性以及约束编译器和操作系统的指令重排,也支持Ordering来提供不同程度的可见性保证。

深入了解Atomic并不意味着我们一定会用他来做一些lock free的开发,毕竟轮子已经有好多了,但至少能更好理解一些并发控制代码中原子操作的实现,也不会对各种Ordering傻傻分不清了。

最后推荐两个不错的Atomic资料,非常有助于理解,感兴趣的可以去看看

  • Rust Atomics and Locks: memory ordering[3]
  • Crust of Rust: Atomics and Memory Ordering[4]

参考资料

[1] C++ 20的内存排序: https://en.cppreference.com/w/cpp/atomic/memory_order

[2] OneCell: https://docs.rs/once_cell/latest/once_cell/#lazy-initialized-global-data

[3] Rust Atomics and Locks: memory ordering: https://marabos.nl/atomics/memory-ordering.html

[4] Crust of Rust: Atomics and Memory Ordering: https://www.youtube.com/watch?v=rMGWeSjctlY