初学rust,多线程中读写全局变量
Tag 全局变量, 读写, rust, on by view 21

参考上一篇《初学rust,如何实现在运行时对全局变量设置和读取》,文章中说,可以用OnceCell来定义全局变量,然后就可以对全局变量进行读写,实际上,我发现OnceCellset()方法只能调用一次,如果你试图第二次调用set()来修改已经设定好的值,将会报错,设置值就失败了。

由于这一切都是为了读写的并发安全。所以OnceCell是无法实现读写全局变量的。要实现读写全局变量,应该使用RwLock。如下

定义全局变量

static WORKER_PID: RwLock<Option<u32>> = RwLock::new(None);

读写全局变量

// 写入
fn store_worker_pid(pid: u32) {
    let mut data = WORKER_PID.write().unwrap();
    *data = Some(pid);
}

// 读取
let data = WORKER_PID.read().unwrap();
let cpid = data.unwrap();

这里的读写就可以多次进行了。可以看到,这里使用了读写锁,写的时候会先通过write()方法获取写入锁实例,然后对其赋值写入。读取的时候会先通过read()方法获取读取锁实例,然后取里面的值。这样一来就实现了全局变量的读写。


初学rust,如何实现在运行时对全局变量设置和读取
Tag 运行时, 全局变量, 读写, on by view 45

在rust中,如何实现在运行时对全局变量设置和读取,这个问题困扰了我一段时间。因为在rust中,全局变量是不能在运行时修改的。rust的全局变量是属于全局静态变量,使用关键词static来定义,如下:

static mut count: usize = 0;

pub fn init_config(file_path: &str) {
    count = count + 1;
}

编译器会告诉你修改count不安全(unsafe)。

➜  las git:(feature/self-update-with-watcher-fork) ✗ cargo build
   Compiling las v0.1.1 (/root/code/las)
error[E0133]: use of mutable static is unsafe and requires unsafe function or block
  --> src/config/cfg.rs:35:13
   |
35 |     count = count + 1;
   |             ^^^^^ use of mutable static
   |
   = note: mutable statics can be mutated by multiple threads: aliasing violations or data races will cause undefined behavior

error[E0133]: use of mutable static is unsafe and requires unsafe function or block
  --> src/config/cfg.rs:35:5
   |
35 |     count = count + 1;
   |     ^^^^^^^^^^^^^^^^^ use of mutable static
   |
   = note: mutable statics can be mutated by multiple threads: aliasing violations or data races will cause undefined behavior

For more information about this error, try `rustc --explain E0133`.
error: could not compile `las` (bin "las") due to 2 previous errors

一定要修改的话,需要加unsafe块包裹住:

static mut count: usize = 0;

pub fn init_config(file_path: &str) {
    unsafe { count = count + 1 };
}

但是,在rust中使用unsafe是一件不好的事情,我们应该尽量避免使用unsafe,这里有另一个方案可以实现在运行时设置全局变量,就是使用once_cell这个包,虽然它的底层也是基于unsafe来实现的,但是它对“不安全”代码进行了一定的包装。具体用法如下:

static CFG: OnceCell<Conf> = OnceCell::new();

// 初始化和设置
pub fn init_config(file_path: &str) {
    CFG.get_or_init(|| load_config(file_path));
}

// 读取
pub fn get_config() -> &'static Conf {
    CFG.get().unwrap()
}

可以看到,我们可以在运行时使用once_cellget_or_init方法对全局变量进行初始化设置,它还有set方法也可以在运行时对全局变量进行设置,get方法在运行时对全局变量进行读取。从而实现不直接使用unsafe块,来操作全局变量。而且once_cell使用的全局变量完全不必申明为可变(mut)变量。

我们可以通过追踪once_cell::get_or_init方法,可以看到在initialize方法中unsafe块中,将value设置给了*slot

qby9tqm9

可以看到,once_cell对全局变量的操作进行了安全封装,因此,建议使用once_cell进行全局变量的操作,而不是到处使用unsafe块进行不安全的操作。

^注意:实践证明,OnceCell只能调用set()对全局变量设置一次,后续再调用set()就会报错。要反复读写甚至多线程中读写全局变量,应该使用RwLock,详情参考新的文章《初学rust,多线程中读写全局变量》


初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 219

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。