初学rust,多线程中读写全局变量
Tag 全局变量, 读写, rust, on by view 6

参考上一篇《初学rust,如何实现在运行时对全局变量设置和读取》,文章中说,可以用OnceCell来定义全局变量,然后就可以对全局变量进行读写,实际上,我发现OnceCellset()方法只能调用一次,如果你试图第二次调用set()来修改已经设定好的值,将会报错,设置值就失败了。

由于这一切都是为了读写的并发安全。所以OnceCell是无法实现读写全局变量的。要实现读写全局变量,应该使用RwLock。如下

定义全局变量

static WORKER_PID: RwLock<Option<u32>> = RwLock::new(None);

读写全局变量

// 写入
fn store_worker_pid(pid: u32) {
    let mut data = WORKER_PID.write().unwrap();
    *data = Some(pid);
}

// 读取
let data = WORKER_PID.read().unwrap();
let cpid = data.unwrap();

这里的读写就可以多次进行了。可以看到,这里使用了读写锁,写的时候会先通过write()方法获取写入锁实例,然后对其赋值写入。读取的时候会先通过read()方法获取读取锁实例,然后取里面的值。这样一来就实现了全局变量的读写。


初学rust,不允许遍历过程中修改HashMap
Tag rust, HashMap, 遍历, 修改, on by view 31

先看一段代码

#[test]
fn test_hash_map() {
    let mut mp: HashMap<&str, &str> = HashMap::new();
    mp.insert("k", "v");
    mp.insert("k1", "v1");
    let x = mp.keys().clone();
    for k in x {
        mp.insert("k2", "v1");
        mp.insert("k3", "v1");
    }
    mp.insert("k2", "v1");
    mp.insert("k3", "v1");
}

看,它报错

error[E0502]: cannot borrow `mp` as mutable because it is also borrowed as immutable
   --> src/process.rs:153:9
    |
151 |     let x = mp.keys().clone();
    |             -- immutable borrow occurs here
152 |     for k in x {
    |              - immutable borrow later used here
153 |         mp.insert("k2", "v1");
    |         ^^^^^^^^^^^^^^^^^^^^^ mutable borrow occurs here

报错告诉我们,不允许将mp作为mutable,因为它已经用于immutable了。难道,HashMap插入数据完毕,开始读取数据之后,不能再次插入数据了?我一开始这么怀疑,不应该啊,于是代码改成这样

#[test]
fn test_hash_map() {
    let mut mp: HashMap<&str, &str> = HashMap::new();
    mp.insert("k", "v");
    mp.insert("k1", "v1");
    mp.keys().clone();
    mp.insert("k2", "v1");
    mp.insert("k3", "v1");
}

正常了,没报错了。看样子问题出在这个for ..in..中。仔细分析代码,我在for中遍历了它的key,或者说我正在将这个HashMap中的数据拿出来,这时候,我在for中尝试往这个HashMap中写入数据,写入数据会让这个HashMap发生变更,这里第一感让我觉得可能有问题。比如我HashMap中有10个元素,我在遍历它,然后在for中间插入新元素,那么是不是有下列问题:1,我的HashMap会不会越遍历越多,会不会永远无法遍历完;2,HashMap是无序的,我将新元素插入HashMap中,会不会导致我已经遍历过的数据由于插入新数据,导致再次被读取出来,因为它可能位置发生变化了嘛。

其实这就是数据竞争和它带来的不确定性问题,rust作为一个内存安全第一的编程语言,编译器会教你做人。

于是,我再改

#[test]
fn test_hash_map() {
    let mut mp: HashMap<&str, &str> = HashMap::new();
    mp.insert("k", "v");
    mp.insert("k1", "v1");
    let x: Vec<&str> = mp.keys().map(|k| *k).collect();
    for k in x {
        mp.insert("k2", "v1");
        mp.insert("k3", "v1");
    }
    mp.insert("k2", "v1");
    mp.insert("k3", "v1");
}

这回正常了。可以看到我的操作let x: Vec<&str> = mp.keys().map(|k| *k).collect();是将keys()拿到的Keys迭代器(仍旧从前面的HashMap里迭代)通过.collect()方法将迭代器里的元素“倒”入到Vec<&str>,这样这个Vec就是一个独立与HashMap内存空间之外的变量,再基于这个Vec进行遍历,就可以避免“边遍历边修改的”的情况了。

那么大家可以思考一下,其他语言,比如golang,遇到这种情况是怎么处理的呢。


初学rust,踩坑私有依赖
Tag rust, 依赖, git, on by view 48

最近尝试在一个项目中引用一个私有crate,这个crate是一个lib,然后存储在私有git仓库中,并且这个git平台不支持rust的crate-index.

  • 第一坑,引用git依赖

于是我在我的项目中这样引用外部依赖包

[package]
name = "las"
version = "0.1.1"
authors = ["...."]
edition = "2018"

[dependencies]
elasticsearch = { version = "8.5.0-alpha.1", default-features = false, features = [
    "rustls-tls",
] }
logwatcher = "0.1.1"
json = "*"
reqwest = { version = "0.11.12", default-features = false, features = [
    "blocking",
    "rustls-tls",
] }
toml = "0.8.6"
serde_derive = "1.0.177"
serde = "1.0.177"
serde_json = "1.0.104"
once_cell = "1.18.0"
futures = "0.3"
tokio = { version = "*", features = ["full"] }
uuid = { version = "1.4", features = ["v4", "fast-rng", "macro-diagnostics"] }
chrono = "0.4"
core_affinity = "0.8.0"
geoip = { git = "ssh://git@git.xxxx.com/xxxx/tgeoip.git", branch = "master", package = "geoip" }

nix = "0.26"
libc = "0.2.146"
clap = { version = "4.4.2", features = ["derive"] }
log4rs = { version = "1.2.0" }
log = { version = "0.4.20" }
[build-dependencies]
regex = "1.6.0"

需要注意的是,我git@git.xxxx.com:xxxx/tgeoip.git仓库中是一个workspace,里面有2个子项,一个是可执行bin项目,叫做regen,一个是外部可依赖包,叫做geoip(后续实践表明同一个workspace里面的子项引用就应该这么干,git路径是同一个,使用package指定子项的包名),我这样引入之后,发现死活拉取不了私有git依赖。报错没权限拉取,git鉴权失败

error: failed to get `geoip` as a dependency of package `las v0.1.1 (/data/code/rust/las)`

Caused by:
  failed to load source for dependency `geoip`

Caused by:
  Unable to update ssh://git@git.xxxx.com/xxxx/tgeoip.git?branch=master#e41c5279

Caused by:
  failed to clone into: /root/.cargo/git/db/tgeoip-9094aceea5940357

Caused by:
  failed to authenticate when downloading repository

  * attempted ssh-agent authentication, but no usernames succeeded: `git`

  if the git CLI succeeds then `net.git-fetch-with-cli` may help here
  https://doc.rust-lang.org/cargo/reference/config.html#netgit-fetch-with-cli

Caused by:
  no authentication methods succeeded

各方查询后,都是建议添加这个选项,我开始还没搞清楚加在哪儿,但是发现使用环境变量好使。各方尝试后:

添加到~/.cargo/config文件中,如下

[net]
git-fetch-with-cli = true

然后再次执行cargo build无需环境变量即可。(前提是你本地已经配置好了访问ssh://git@git.xxxx.com/xxxx/tgeoip.git的ssh key)

  • 第二坑,外部依赖包要申明为rlib

我在这个bin项目中引用geoip,发现一直报错

failed to resolve: use of undeclared crate or module `geoip`

百思不得其解,最后发现是我依赖包申明类型的问题,我Cargo.toml申明的是

[package]
....

[lib]
name = "geoip"
crate-type = ["staticlib", "cdylib"]

[dependencies]
bincode = "1.3.3"
serde = { version = "1.0.193", features = ["derive"] }

实际lib应该申明rlib类型,才能被正常引用,改为如下,更新依赖,就没报错了

[package]
....

[lib]
name = "geoip"
crate-type = ["staticlib", "cdylib", "rlib"]

[dependencies]
bincode = "1.3.3"
serde = { version = "1.0.193", features = ["derive"] }

初学rust,错误处理
Tag 错误处理, rust, on by view 11

rust中如何比较优雅的进行错误处理,这是一直以来困扰我的一个问题。最近写了一个ip地址库查询包,于是在其中实践了一下自定义错误、错误抛出等处理。

rust中,如果一个函数需要返回错误,那么应该用Result包裹返回值,Result定义如下

enum Result<T, E> {
    Ok(T),
    Err(E),
}

通常,Result中的第二个参数就是错误。比如Result<String, String>,那么它的错误就是一个String类型的值,如下

// 返回了正常值hi
fn find() -> Result<String, String> {
    Ok("hi".to_string())
}

// 返回了错误信息error
fn find() -> Result<String, String> {
    Err("error".to_string())
}

那么有另外一个问题,就是,我们如何处理其他库或者第三方函数抛出给我们的错误呢?我们可以用expect(),unwrap()来解决可以解决的错误,但是有时候我们不希望处理错误,希望能够将错误抛出给上层,让上层调用者去处理。这时候,我们应该怎么定义错误类型?你当然可以在你的函数中进行错误处理之后,抛出String类型的错误,以便继续使用String类型作为错误类型。

#[test]
fn test_find() {
    let x = foo();
    println!("{:?}", x.unwrap())
}

fn foo() -> Result<File, String> {
    let x = bar();
    match x {
        Err(e) => Err(e.to_string()),
        Ok(f) => Ok(f),
    }
    ...
}

fn bar() -> Result<File, io::Error> {
    File::open("regions.txt")
}

你可以看到,我在foo()中调用bar()bar()中返回了一个包含io::Error的错误,而我的foo()中要求返回的是String,我在foo中用match处理了错误的情况,并且foo中可能调用其他第三方库函数,返回的错误类型不尽相同,我每种错误类型都可以使用match解开,然后返回String类型的错误。但是这样处理起来,代码看起来就非常乱,到处都是match错误。

其实我们有另一种方法,可以自己定义一个Error类型,然后对Error类型进行扩展,让它兼容其他类型,如下

use std::{fmt, io};

#[derive(Debug)]
pub enum Error {
    ParseError,
    ReadError,
    InvalidIPError,
}

impl std::error::Error for Error {}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Error::ParseError => write!(f, "Parse Error"),
            Error::ReadError => write!(f, "Read Error"),
            Error::InvalidIPError => write!(f, "Invalid IP Error"),
        }
    }
}

impl From<io::Error> for Error {
    fn from(_: io::Error) -> Self {
        Error::ReadError
    }
}

impl From<bincode::Error> for Error {
    fn from(_: bincode::Error) -> Self {
        Error::ParseError
    }
}

impl From<std::net::AddrParseError> for Error {
    fn from(_: std::net::AddrParseError) -> Self {
        Error::InvalidIPError
    }
}

我们的Error类型,实现了fmt::Display,让它能够处理我的包项目中可能出现的第三方错误,并且针对这些错误实现了各自的From<T>from方法,这样,这些第三方错误就可以直接以我的Error返回,使用?简写之后,代码就改成这样了

#[test]
fn test_find() {
    let x = foo();
    println!("{:?}", x.unwrap())
}

fn foo() -> Result<File, Error> {
    let x = bar()?;
    Ok(x)
}

fn bar() -> Result<File, io::Error> {
    File::open("regions.txt")
}

对比可以看到,我在foo中处理bar抛出的错误变得简单了,直接一个问号就将可能得错误抛出到Error了,然后再将Ok(x)返回。这样一来,错误的抛出和处理就变得优雅多了。


初学rust,没有途径修改argv[0]
Tag argv0, rust, 进程标题, on by view 27

我们知道在C语言程序中,可以通过修改argv[0]的值,来实现改变一个进程在ps命令中显示的标题,先给一个C语言的demo如下:

#include <stdio.h>
#include <string.h>
  
extern char **environ;
  
int main(int argc , char *argv[]) {
    int i;
  
    printf("argc:%d\n" , argc);
    for (i = 0; i < argc; i++) {
        printf("argv[%d]:%s\t0x%x\n" , i , argv[i], argv[i]);
    }
  
    for (i = 0; i < argc && environ[i]; i++) {
        printf("evriron[%d]:%s\t0x%x\n" , i , environ[i], environ[i]);
    }

    strcpy(argv[0], "nginx: process is shuting down");

    sleep(1000);
    return 0;
}

进程原本的名称是demo,但是我们通过strcpy修改了argv[0]之后,ps命令显示进程的名称变为我们指定的nginx: process is shuting down,如下

➜  build git:(master) ✗ ps -ef | grep nginx
root     1263942 1252322  0 15:29 pts/2    00:00:00 nginx: process is shuting down
root     1263973 1253542  0 15:29 pts/3    00:00:00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox nginx

这也是nginx程序实现在修改进程标题为nginx: master, nginx: worker 以及 nginx: process is shuting down 的原理,从而实现在ps的标题中标识不同类别的进程。

我们在rust语言中如何达到相同的效果呢,查阅资料了解到,通过下面方法可以修改进程名

use nix::libc::{PR_SET_NAME, prctl};
use std::ffi::CString;

fn main() {
    let new_name = CString::new("NewProcessName").expect("CString::new failed");
    unsafe {
        prctl(PR_SET_NAME, new_name.as_ptr() as usize, 0, 0, 0);
    }
}

但是实际使用后,发现,这种方法修改的进程名并不是ps命令显示的进程标题,ps命令显示的进程标题还是不变,而是修改了pstree命令显示的进程树种的进程名称,所以,这种方法并不能达到我们想要的效果。

我们尝试修改argv[0],在rust中是通过env::args()来获取程序的传参,也即argv,追踪到env::args()调用的是env::args_os(),于是我们有这么一段代码尝试修改argv[0]:

use std::env;
use std::ffi::{CStr, CString, OsString};
use std::os::unix::ffi::OsStringExt;

fn set_process_title(title: &str) {
    let args: Vec<OsString> = env::args_os().collect();
    let mut argv: Vec<*mut i8> = args.iter()
        .map(|arg| {
            let arg_cstring = CString::new(arg.as_bytes()).expect("Failed to create CString");
            arg_cstring.into_raw()
        })
        .collect();
    argv.push(std::ptr::null_mut());

    let title_cstring = CString::new(title).expect("Failed to create CString");

    unsafe {
        strcpy(argv[0] as *mut c_char, title_cstring.as_ptr());
    }
}

fn main() {
    set_process_title("MyWorker");

    // 继续执行其他操作...
}

但是很遗憾,并没有修改argv[0]的效果,继续追踪args_os()发现,其实在多个地方存在clone()操作,我们获取到的argv早就不是原始的argvargs_os()的实现如下

#[stable(feature = "env", since = "1.0.0")]
pub fn args_os() -> ArgsOs {
    ArgsOs { inner: sys::args::args() }
}

sys::args::args()的实现如下

/// Returns the command line arguments
pub fn args() -> Args {
    imp::args()
}

imp::args()的实现如下

pub fn args() -> Args {
    Args { iter: clone().into_iter() }
}

这里clone()实现如下

fn clone() -> Vec<OsString> {
    unsafe {
        // Load ARGC and ARGV, which hold the unmodified system-provided
        // argc/argv, so we can read the pointed-to memory without atomics
        // or synchronization.
        //
        // If either ARGC or ARGV is still zero or null, then either there
        // really are no arguments, or someone is asking for `args()`
        // before initialization has completed, and we return an empty
        // list.
        let argv = ARGV.load(Ordering::Relaxed);
        let argc = if argv.is_null() { 0 } else { ARGC.load(Ordering::Relaxed) };
        let mut args = Vec::with_capacity(argc as usize);
        for i in 0..argc {
            let ptr = *argv.offset(i) as *const libc::c_char;

            // Some C commandline parsers (e.g. GLib and Qt) are replacing already
            // handled arguments in `argv` with `NULL` and move them to the end. That
            // means that `argc` might be bigger than the actual number of non-`NULL`
            // pointers in `argv` at this point.
            //
            // To handle this we simply stop iterating at the first `NULL` argument.
            //
            // `argv` is also guaranteed to be `NULL`-terminated so any non-`NULL` arguments
            // after the first `NULL` can safely be ignored.
            if ptr.is_null() {
                break;
            }

            let cstr = CStr::from_ptr(ptr);
            args.push(OsStringExt::from_vec(cstr.to_bytes().to_vec()));
        }

        args
    }
}

可以看到argv是从ARGV.load(Ordering::Relaxed);中加载出来的。可以看到argv经历了多次拷贝,最终才到args,然后通过args_os()再呈现在我们面前,实际早就不再是最初的那个argv。


初学rust,初始化一个C语言结构体
Tag rust, clang, 结构体, on by view 57

最近用到了libc这个包,调用其中的statfs函数,用于查询指定路径挂载的磁盘占用。可以看到

pub fn statfs(path: *const ::c_char, buf: *mut statfs) -> ::c_int;

statfs第二个参数是一个C语言结构体statfs,其定义如下

pub struct statfs {
    pub f_type: ::__fsword_t,
    pub f_bsize: ::__fsword_t,
    pub f_blocks: ::fsblkcnt_t,
    pub f_bfree: ::fsblkcnt_t,
    pub f_bavail: ::fsblkcnt_t,

    pub f_files: ::fsfilcnt_t,
    pub f_ffree: ::fsfilcnt_t,
    pub f_fsid: ::fsid_t,

    pub f_namelen: ::__fsword_t,
    pub f_frsize: ::__fsword_t,
    f_spare: [::__fsword_t; 5],
}

如果用常规的rust初始化方法,必须填充结构体的各字段

let x = statfs{
    ...,
    f_spare: ...,
}

可以看到其中f_spare字段是一个数组,属于复杂类型,用rust的初始化方式,需要一个个字段的填充,而且需要填充为初始值0,将会非常复杂。其实有一种简单方法,使用std::mem::zeroed函数即可。举例如下

fn statfs(&mut self, mount_path: String) -> String {
    let x = CString::new(mount_path.as_bytes()).expect("covert cstring failed");

    let mut info = String::from("");
    let mut statfs_buf = unsafe { std::mem::zeroed() };
    let ret = unsafe { libc::statfs(x.as_ptr(), &mut statfs_buf) };
    if ret == 0 {
        info = format!(
            "{} {} {} {}",
            statfs_buf.f_bsize, statfs_buf.f_blocks, statfs_buf.f_bfree, statfs_buf.f_bavail
        );
    }
    return info;
}

可以看到std::mem::zeroed()方法,直接能够初始化一段0值的内存空间,这段空间具体大小,直接由下一行libc::statfs(...)调用的第二个参数类型决定,由这一行直接能推导出该申请多大的0空间。


初学rust,tokio无法在spawn中使用MutexGuard
Tag rust, tokio, MutexGuard, spawn, on by view 88

最近再次在rust中尝试用tokio::spawn实现类似go语言中goroutine的用法,但是报错了。

#[tokio::main]
async fn main() {
    let cfg = cfg::get_config();
    let client_id = cfg::get_client_id();
    let ws_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();

    let mut wsc = ws::WebSocketClient::new(ws_addr.clone(), client_id);
    tokio::spawn(wsc.start());

    let monitor_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
    let mut monitor_client = monitor::Monitor::new(monitor_addr.to_string());
    monitor_client.start().await;
}

报错位置在这一行 tokio::spawn(wsc.start());,报错内容如下

error: future cannot be sent between threads safely
   --> src/main.rs:19:18
    |
19  |     tokio::spawn(wsc.start());
    |                  ^^^^^^^^^^^ future returned by `start` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<Message>`
note: future is not `Send` as this value is used across an await
   --> src/ws.rs:117:47
    |
115 |                 if let Ok(msg) = rx.recv() {
    |                                  -- has type `&std::sync::mpsc::Receiver<Message>` which is not `Send`
116 |                     println!("Socket Send    : {:?}", msg);
117 |                     let rst = writer.send(msg).await;
    |                                               ^^^^^^ await occurs here, with `rx` maybe used later
...
126 |             }
    |             - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow

可以看到原因是,我在WebSocketClient中用到了MutexGuard,而MutexGuard不支持被tokio::spawn调用。解释参考这篇文档。实际上我是在另一个结构体Pty中使用了Arc<Mutex<Receiver<Message>>>类型,然后WebSocketClient中又实用了Pty结构体,所以在调用tokio::spawn时报错。

如何解决?别用标准库的Mutex了,用tokio的。而且,我同样发现了Sender,Receiver都在报类似的错误the trait std::marker::Send is not implemented for ...

use std::sync::mpsc::{channel, Receiver, Sender}

// 改为
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;

channel,Receiver,Sender全部改为tokio版本的,就可以兼容tokio了。


初学rust,多态JSON对象的序列化与反序列化
Tag rust, 序列化, 反序列化, on by view 90

在rust中序列化反序列化JSON我们常用的是serde这个包,但是json中有一种情况就是存在不确定类型的字段,或者说这个字段可能是多种类型;在golang中,我们用interface{}类型来表达,在java中,我们用Object表达,那么在rust应该如何表达呢。直接说结果,用enum来组合类型

比如我定义了一种JSON数据结构,如下

// 第一种
{
    "msg_type": "resize",
    "content": {
        "width": 12,
        "height": 3
    }
}

// 第二种
{
    "msg_type": "ping",
    "content": {
        "ts": 123
    }
}

很明显,两个数据中,content字段表现的类型不一样,在msg_typeresizecontent字段是一种结构体,msg_typepingcontent字段是另一种结构体。我们定义枚举和结构体如下

#[derive(Clone, Debug, Deserialize)]
struct ControlMessage {
    msg_type: String,
    #[serde(default)]
    content: ControlContent,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(untagged)]
enum ControlContent {
    Empty,
    Resize(ResizeControl),
    Time(TimeControl),
}

impl Default for ControlContent {
    fn default() -> Self {
        Self::Empty
    }
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
struct ResizeControl {
    width: u32,
    height: u32,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
struct TimeControl {
    ts: u32,
}

可以看到我们定义了两种类型ResizeControlTimeControl,需要注意以下几点:

  1. 要支持反序列化,所有用到的结构体都需要添加Deserialize注解
  2. 枚举中,Empty,Resize,Time这3个都属于枚举的字段名,圆括号()里定义类型,没有定义的表示空
  3. 枚举上,需要添加untagged注解
  4. 要为枚举实现Default::default方法,否则无法被Deserialize注解
  5. 被枚举引用的结构体ResizeControl,TimeControl,以及枚举ControlContent都需要加上PartialEq注解,否则无法在枚举圆括号()中引用

调用反序列化

#[test]
pub fn test_field_deserialize() -> serde_json::Result<()> {
    let a = r#"{"msg_type": "resize", "content": {"width": 12, "height": 3}}"#;
    let v: ControlMessage = serde_json::from_str(a)?;
    println!("v: {:?}", v);

    let b = r#"{"msg_type": "resize", "content": {"ts": 123}}"#;
    let v: ControlMessage = serde_json::from_str(b)?;
    println!("v: {:?}", v);

    Ok(())
}

结果调用成功

running 1 test
v: ControlMessage { msg_type: "resize", content: Resize(ResizeControl { width: 12, height: 3 }) }
v: ControlMessage { msg_type: "resize", content: Time(TimeControl { ts: 123 }) }
test control::test_field_deserialize ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

初学rust,return与无分号结尾区别
Tag rust, return, on by view 120

在rust中,我们在一开始学习rust时,就被告诉,在函数结尾可以不用写return,只需要结尾的那个语句别写分号(;),那么结尾的这个语句计算的值将会作为这个函数的返回值返回。

#[test]
fn test_return_2() {
    println!("none semi: {}", none_semicolon());
}

fn none_semicolon() -> String {
    String::from("hello world")
}

正常输出

running 1 test
none semi: hello world
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

这时,我形成一个印象,“结尾语句不加分号,可以当return用”。还有如下的例子

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            val
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我可以看到y变量是bool类型,这时候我以为val这句没有加分号,所以它将值返回给语句块,从而赋值给y,但是,这时候,我改为下面这样的

#[test]
fn test_return_2() {
    println!("none semi: {}", test_return_1());
}

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            return val;
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我发现,return 处就结束了函数并返回。后续的println并没有执行。

running 1 test
true
none semi: true
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

总结,位于语句块或者函数体结尾无分号结尾的语句,并不等同于return,它处于语句块结尾时,代表将它的值赋值给语句块所代表的变量,处于函数体结尾时,代表将它的值赋值给函数体所代表的变量(函数返回变量);而return这个函数终止返回的指令,从来都跟无分号结尾的语句无关,而是函数体的反花括号 (}) 的出现所带来的行为。


初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 188

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。