初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 283

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。