初学rust,没有途径修改argv[0]
Tag argv0, rust, 进程标题, on by view 10

我们知道在C语言程序中,可以通过修改argv[0]的值,来实现改变一个进程在ps命令中显示的标题,先给一个C语言的demo如下:

#include <stdio.h>
#include <string.h>
  
extern char **environ;
  
int main(int argc , char *argv[]) {
    int i;
  
    printf("argc:%d\n" , argc);
    for (i = 0; i < argc; i++) {
        printf("argv[%d]:%s\t0x%x\n" , i , argv[i], argv[i]);
    }
  
    for (i = 0; i < argc && environ[i]; i++) {
        printf("evriron[%d]:%s\t0x%x\n" , i , environ[i], environ[i]);
    }

    strcpy(argv[0], "nginx: process is shuting down");

    sleep(1000);
    return 0;
}

进程原本的名称是demo,但是我们通过strcpy修改了argv[0]之后,ps命令显示进程的名称变为我们指定的nginx: process is shuting down,如下

➜  build git:(master) ✗ ps -ef | grep nginx
root     1263942 1252322  0 15:29 pts/2    00:00:00 nginx: process is shuting down
root     1263973 1253542  0 15:29 pts/3    00:00:00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox nginx

这也是nginx程序实现在修改进程标题为nginx: master, nginx: worker 以及 nginx: process is shuting down 的原理,从而实现在ps的标题中标识不同类别的进程。

我们在rust语言中如何达到相同的效果呢,查阅资料了解到,通过下面方法可以修改进程名

use nix::libc::{PR_SET_NAME, prctl};
use std::ffi::CString;

fn main() {
    let new_name = CString::new("NewProcessName").expect("CString::new failed");
    unsafe {
        prctl(PR_SET_NAME, new_name.as_ptr() as usize, 0, 0, 0);
    }
}

但是实际使用后,发现,这种方法修改的进程名并不是ps命令显示的进程标题,ps命令显示的进程标题还是不变,而是修改了pstree命令显示的进程树种的进程名称,所以,这种方法并不能达到我们想要的效果。

我们尝试修改argv[0],在rust中是通过env::args()来获取程序的传参,也即argv,追踪到env::args()调用的是env::args_os(),于是我们有这么一段代码尝试修改argv[0]:

use std::env;
use std::ffi::{CStr, CString, OsString};
use std::os::unix::ffi::OsStringExt;

fn set_process_title(title: &str) {
    let args: Vec<OsString> = env::args_os().collect();
    let mut argv: Vec<*mut i8> = args.iter()
        .map(|arg| {
            let arg_cstring = CString::new(arg.as_bytes()).expect("Failed to create CString");
            arg_cstring.into_raw()
        })
        .collect();
    argv.push(std::ptr::null_mut());

    let title_cstring = CString::new(title).expect("Failed to create CString");

    unsafe {
        strcpy(argv[0] as *mut c_char, title_cstring.as_ptr());
    }
}

fn main() {
    set_process_title("MyWorker");

    // 继续执行其他操作...
}

但是很遗憾,并没有修改argv[0]的效果,继续追踪args_os()发现,其实在多个地方存在clone()操作,我们获取到的argv早就不是原始的argvargs_os()的实现如下

#[stable(feature = "env", since = "1.0.0")]
pub fn args_os() -> ArgsOs {
    ArgsOs { inner: sys::args::args() }
}

sys::args::args()的实现如下

/// Returns the command line arguments
pub fn args() -> Args {
    imp::args()
}

imp::args()的实现如下

pub fn args() -> Args {
    Args { iter: clone().into_iter() }
}

这里clone()实现如下

fn clone() -> Vec<OsString> {
    unsafe {
        // Load ARGC and ARGV, which hold the unmodified system-provided
        // argc/argv, so we can read the pointed-to memory without atomics
        // or synchronization.
        //
        // If either ARGC or ARGV is still zero or null, then either there
        // really are no arguments, or someone is asking for `args()`
        // before initialization has completed, and we return an empty
        // list.
        let argv = ARGV.load(Ordering::Relaxed);
        let argc = if argv.is_null() { 0 } else { ARGC.load(Ordering::Relaxed) };
        let mut args = Vec::with_capacity(argc as usize);
        for i in 0..argc {
            let ptr = *argv.offset(i) as *const libc::c_char;

            // Some C commandline parsers (e.g. GLib and Qt) are replacing already
            // handled arguments in `argv` with `NULL` and move them to the end. That
            // means that `argc` might be bigger than the actual number of non-`NULL`
            // pointers in `argv` at this point.
            //
            // To handle this we simply stop iterating at the first `NULL` argument.
            //
            // `argv` is also guaranteed to be `NULL`-terminated so any non-`NULL` arguments
            // after the first `NULL` can safely be ignored.
            if ptr.is_null() {
                break;
            }

            let cstr = CStr::from_ptr(ptr);
            args.push(OsStringExt::from_vec(cstr.to_bytes().to_vec()));
        }

        args
    }
}

可以看到argv是从ARGV.load(Ordering::Relaxed);中加载出来的。可以看到argv经历了多次拷贝,最终才到args,然后通过args_os()再呈现在我们面前,实际早就不再是最初的那个argv。


初学rust,初始化一个C语言结构体
Tag rust, clang, 结构体, on by view 41

最近用到了libc这个包,调用其中的statfs函数,用于查询指定路径挂载的磁盘占用。可以看到

pub fn statfs(path: *const ::c_char, buf: *mut statfs) -> ::c_int;

statfs第二个参数是一个C语言结构体statfs,其定义如下

pub struct statfs {
    pub f_type: ::__fsword_t,
    pub f_bsize: ::__fsword_t,
    pub f_blocks: ::fsblkcnt_t,
    pub f_bfree: ::fsblkcnt_t,
    pub f_bavail: ::fsblkcnt_t,

    pub f_files: ::fsfilcnt_t,
    pub f_ffree: ::fsfilcnt_t,
    pub f_fsid: ::fsid_t,

    pub f_namelen: ::__fsword_t,
    pub f_frsize: ::__fsword_t,
    f_spare: [::__fsword_t; 5],
}

如果用常规的rust初始化方法,必须填充结构体的各字段

let x = statfs{
    ...,
    f_spare: ...,
}

可以看到其中f_spare字段是一个数组,属于复杂类型,用rust的初始化方式,需要一个个字段的填充,而且需要填充为初始值0,将会非常复杂。其实有一种简单方法,使用std::mem::zeroed函数即可。举例如下

fn statfs(&mut self, mount_path: String) -> String {
    let x = CString::new(mount_path.as_bytes()).expect("covert cstring failed");

    let mut info = String::from("");
    let mut statfs_buf = unsafe { std::mem::zeroed() };
    let ret = unsafe { libc::statfs(x.as_ptr(), &mut statfs_buf) };
    if ret == 0 {
        info = format!(
            "{} {} {} {}",
            statfs_buf.f_bsize, statfs_buf.f_blocks, statfs_buf.f_bfree, statfs_buf.f_bavail
        );
    }
    return info;
}

可以看到std::mem::zeroed()方法,直接能够初始化一段0值的内存空间,这段空间具体大小,直接由下一行libc::statfs(...)调用的第二个参数类型决定,由这一行直接能推导出该申请多大的0空间。


初学rust,tokio无法在spawn中使用MutexGuard
Tag rust, tokio, MutexGuard, spawn, on by view 54

最近再次在rust中尝试用tokio::spawn实现类似go语言中goroutine的用法,但是报错了。

#[tokio::main]
async fn main() {
    let cfg = cfg::get_config();
    let client_id = cfg::get_client_id();
    let ws_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();

    let mut wsc = ws::WebSocketClient::new(ws_addr.clone(), client_id);
    tokio::spawn(wsc.start());

    let monitor_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
    let mut monitor_client = monitor::Monitor::new(monitor_addr.to_string());
    monitor_client.start().await;
}

报错位置在这一行 tokio::spawn(wsc.start());,报错内容如下

error: future cannot be sent between threads safely
   --> src/main.rs:19:18
    |
19  |     tokio::spawn(wsc.start());
    |                  ^^^^^^^^^^^ future returned by `start` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<Message>`
note: future is not `Send` as this value is used across an await
   --> src/ws.rs:117:47
    |
115 |                 if let Ok(msg) = rx.recv() {
    |                                  -- has type `&std::sync::mpsc::Receiver<Message>` which is not `Send`
116 |                     println!("Socket Send    : {:?}", msg);
117 |                     let rst = writer.send(msg).await;
    |                                               ^^^^^^ await occurs here, with `rx` maybe used later
...
126 |             }
    |             - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow

可以看到原因是,我在WebSocketClient中用到了MutexGuard,而MutexGuard不支持被tokio::spawn调用。解释参考这篇文档。实际上我是在另一个结构体Pty中使用了Arc<Mutex<Receiver<Message>>>类型,然后WebSocketClient中又实用了Pty结构体,所以在调用tokio::spawn时报错。

如何解决?别用标准库的Mutex了,用tokio的。而且,我同样发现了Sender,Receiver都在报类似的错误the trait std::marker::Send is not implemented for ...

use std::sync::mpsc::{channel, Receiver, Sender}

// 改为
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;

channel,Receiver,Sender全部改为tokio版本的,就可以兼容tokio了。


初学rust,多态JSON对象的序列化与反序列化
Tag rust, 序列化, 反序列化, on by view 60

在rust中序列化反序列化JSON我们常用的是serde这个包,但是json中有一种情况就是存在不确定类型的字段,或者说这个字段可能是多种类型;在golang中,我们用interface{}类型来表达,在java中,我们用Object表达,那么在rust应该如何表达呢。直接说结果,用enum来组合类型

比如我定义了一种JSON数据结构,如下

// 第一种
{
    "msg_type": "resize",
    "content": {
        "width": 12,
        "height": 3
    }
}

// 第二种
{
    "msg_type": "ping",
    "content": {
        "ts": 123
    }
}

很明显,两个数据中,content字段表现的类型不一样,在msg_typeresizecontent字段是一种结构体,msg_typepingcontent字段是另一种结构体。我们定义枚举和结构体如下

#[derive(Clone, Debug, Deserialize)]
struct ControlMessage {
    msg_type: String,
    #[serde(default)]
    content: ControlContent,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(untagged)]
enum ControlContent {
    Empty,
    Resize(ResizeControl),
    Time(TimeControl),
}

impl Default for ControlContent {
    fn default() -> Self {
        Self::Empty
    }
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
struct ResizeControl {
    width: u32,
    height: u32,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
struct TimeControl {
    ts: u32,
}

可以看到我们定义了两种类型ResizeControlTimeControl,需要注意以下几点:

  1. 要支持反序列化,所有用到的结构体都需要添加Deserialize注解
  2. 枚举中,Empty,Resize,Time这3个都属于枚举的字段名,圆括号()里定义类型,没有定义的表示空
  3. 枚举上,需要添加untagged注解
  4. 要为枚举实现Default::default方法,否则无法被Deserialize注解
  5. 被枚举引用的结构体ResizeControl,TimeControl,以及枚举ControlContent都需要加上PartialEq注解,否则无法在枚举圆括号()中引用

调用反序列化

#[test]
pub fn test_field_deserialize() -> serde_json::Result<()> {
    let a = r#"{"msg_type": "resize", "content": {"width": 12, "height": 3}}"#;
    let v: ControlMessage = serde_json::from_str(a)?;
    println!("v: {:?}", v);

    let b = r#"{"msg_type": "resize", "content": {"ts": 123}}"#;
    let v: ControlMessage = serde_json::from_str(b)?;
    println!("v: {:?}", v);

    Ok(())
}

结果调用成功

running 1 test
v: ControlMessage { msg_type: "resize", content: Resize(ResizeControl { width: 12, height: 3 }) }
v: ControlMessage { msg_type: "resize", content: Time(TimeControl { ts: 123 }) }
test control::test_field_deserialize ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

初学rust,return与无分号结尾区别
Tag rust, return, on by view 40

在rust中,我们在一开始学习rust时,就被告诉,在函数结尾可以不用写return,只需要结尾的那个语句别写分号(;),那么结尾的这个语句计算的值将会作为这个函数的返回值返回。

#[test]
fn test_return_2() {
    println!("none semi: {}", none_semicolon());
}

fn none_semicolon() -> String {
    String::from("hello world")
}

正常输出

running 1 test
none semi: hello world
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

这时,我形成一个印象,“结尾语句不加分号,可以当return用”。还有如下的例子

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            val
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我可以看到y变量是bool类型,这时候我以为val这句没有加分号,所以它将值返回给语句块,从而赋值给y,但是,这时候,我改为下面这样的

#[test]
fn test_return_2() {
    println!("none semi: {}", test_return_1());
}

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            return val;
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我发现,return 处就结束了函数并返回。后续的println并没有执行。

running 1 test
true
none semi: true
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

总结,位于语句块或者函数体结尾无分号结尾的语句,并不等同于return,它处于语句块结尾时,代表将它的值赋值给语句块所代表的变量,处于函数体结尾时,代表将它的值赋值给函数体所代表的变量(函数返回变量);而return这个函数终止返回的指令,从来都跟无分号结尾的语句无关,而是函数体的反花括号 (}) 的出现所带来的行为。


初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 86

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。


初学rust,HashMap的clone
Tag rust, clone, hashmap, on by view 86

在rust中有个常用个方法clone,按字面意思就是克隆。这个函数的作用是对对象进行深度拷贝,生成的新对象与原对象相互独立。

很多常用的类型或者容器类型都支持clone,例如rust中的HashMap也支持clone,我们用一段代码实验一下。

#[test]
fn test_hash_map_clone() {
    let xx: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
    let mut mp = xx.lock().unwrap();
    mp.insert("hi".to_string(), "hello".to_string());
    println!("origin: {:?}", mp);
    let mut cp = mp.clone();
    cp.insert("k".to_string(), "v".to_string());
    println!("origin: {:?}", mp);
    println!("cp    : {:?}", cp);
}

输出

running 1 test
origin: {"hi": "hello"}
origin: {"hi": "hello"}
cp    : {"hi": "hello", "k": "v"}
test test_hash_map_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

上面的测试代码运行结果表示,修改克隆后的对象cp,源对象mp不会发生变化。

那么我们自己定义的类型如何才能支持clone呢?使用#[derive(Clone)]这个指令修饰自定义类型,就会自动支持clone,但是要注意,如果自定义类型结构体里,如果有字段类型不支持clone,将无法通过#[derive(Clone)]指令快速支持clone

自定义类型clone测试如下

#[derive(Debug, Clone)]
struct User {
    name: String,
    age: i32,
}

#[test]
fn test_struct_clone() {
    let mut u1 = User {
        name: "rex".to_string(),
        age: 1,
    };
    println!("origin: {:?}", u1);
    let mut ucp = u1.clone();
    ucp.name = "agnes".to_string();
    ucp.age = 2;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
    u1.age = 3;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
}

运行结果

running 1 test
origin: User { name: "rex", age: 1 }
origin: User { name: "rex", age: 1 }
cp    : User { name: "agnes", age: 2 }
origin: User { name: "rex", age: 3 }
cp    : User { name: "agnes", age: 2 }
test test_struct_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

同样可以看到,修改clone后的对象,源对象不变,修改源对象,clone后的对象也不变。


初学rust,如何在线程中调用成员方法
Tag rust, 线程, 方法, on by view 74

如何在线程中调用成员方法?

普通调用习惯写法

fn watch_receiver(mut self, rx: Receiver<String>) {
    thread::spawn(move || {
        for line in rx.iter() {
            self.push(line);
        }
    });
}

会报错

p.watch_receiver(rx);
    |           ------------------ `p` moved due to this method call
70  |         p.watch_poly();
    |         ^^^^^^^^^^^^^^ value borrowed here after move

这里需要把形参self改为指针&self,然后在方法体中克隆这个指针,就可以在方法中的线程里直接通过这个指针的克隆成员方法。

改为

fn watch_receiver(&self, rx: Receiver<String>) {
    let mut me = self.clone();
    thread::spawn(move || {
        for line in rx.iter() {
            me.push(line);
        }
    });
}

即可通过。

但是要注意,这里的clone,真的是克隆。所以clone前后的变量,即selfme是2个不同的变量,里面的成员也是在不同的内存空间上,修改self中的成员属性,me中对应的成员属性并不会跟着变。所以,如果里面的成员属性需要跟随变化,必须把成员属性定义为指针,这样修改指正所指的值,selfme中成员属性所指的值是相同的。


初学rust,使用协程
Tag rust, 协程, on by view 94

最近想使用rust写个tcp透明代理转发服务,这中间涉及到socket监听以及连接处理逻辑中连接后端服务并处理连接后的相关逻辑。

监听端口

fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                thread::spawn(move || {
                    // connection succeeded
                    handle_client(stream)
                });
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

处理客户端连接

fn handle_client(mut stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let mut client_stream = get_rs_connect_stream_by_ipport(x.to_string().as_str());
    println!("client stream: {:?}", client_stream);

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                // echo everything!
                println!("len: {:?}", size);
                stream.write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

可以看到这里每接受一个连接,就会起一个线程,如果在handle_client里再连接后端的话,就需要针对client_stream再起一个线程在线程中执行死循环,否则死循环会卡主stream这个死循环。这样一来,一个链接就得起2个线程了,那么连接一多,线程就更多了,一个进程能创建的线程数是有限的,因为线程对资源占用相对比较大,这样连接数一多,系统资源就不够用,性能就会很差。

对比到golang中的协程,我们能否在rust中使用协程呢?答案是肯定的。下面简单介绍一下rust中如何使用协程。rust中使用“协程”,我们用到tokio这个包。

对于单个异步,我们可以用async和await就可以了

#[tokio::main]
async fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                handle_client(stream).await  // 其中 handle_client 改为了 async 异步函数
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

其中 handle_client 是 async 异步函数,但是如果我想在handle_client中再起一个异步函数,直接使用async是不行的。我们可以这样处理,使用tokio::spawn就可以了,这里tokio::spawn有点类似golang中的go关键词,调用tokio::spawn的地方可以起一个异步函数。

async fn rs_handle(rs_stream: Arc<TcpStream>, stream: Arc<TcpStream>) {
    let mut client_data = [0 as u8; 50]; // using 50 byte buffer
    while match rs_stream.as_ref().read(&mut client_data) {
        Ok(client_size) => {
            if client_size == 0 {
                false
            } else {
                println!("len: {:?}", client_size);
                stream.as_ref().write(&client_data[0..client_size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!("client error occurred.");
            false
        }
    } {}
}

async fn handle_client(stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let stream = Arc::new(stream);
    let rs_stream = Arc::new(get_rs_connect_stream_by_ipport(x.to_string().as_str()));
    println!("rs stream: {:?}", rs_stream);

    let rsh_rs_stream = rs_stream.clone();
    let rsh_stream = stream.clone();
    tokio::spawn(rs_handle(rsh_rs_stream, rsh_stream)); // 这里相当于起一个线程
    println!("rs handle setted.");

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.as_ref().read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                rs_stream.as_ref().write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

如上代码,首先需要定义一个异步函数async fn rs_handle(...)然后在调用的地方使用tokio::spawn(rs_handle(...))来调用。就可以实现同等于golang中go关键词的效果,起一个协程。


初学rust,数组vector的自增
Tag rust, vector, 自增, on by view 46

在golang中经常会踩一个坑,那就是slice append,golang的动态数组也称为slice,使用append可以对动态数组进行添加元素,但是slice空间不够之后golang会自动重新分配内存空间,每次重新分配的内存空间是原空间的2倍,而且有个更坑的是,golang中slice每次重新分配内存都是重新分配一片 2N 大小的内存,然后把原来的数据拷贝过去,这样一来,性能损耗更大了。

那么rust中的数组是怎么处理动态增长的呢,我们来一段代码测试一下。

fn main() {
    let mut vec = Vec::with_capacity(100);

    // The vector contains no items, even though it has capacity for more
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // These are all done without reallocating...
    for i in 0..100 {
        vec.push(i);
    }
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // ...but this may make the vector reallocate
    vec.push(101);
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());
}

输出如下

vec.len: 0
vec.cap: 100
vec.len: 100
vec.cap: 100
vec.len: 101
vec.cap: 200

可以看到,rust中,当vector数组存储满了之后,再往里面添加元素,vector就会重新分配内存,新分配的内存也是原来空间的2倍,但是,他是在原来的内存上扩充的,而不是像golang一样重新分配一片2N的内存空间替换旧的内存。性能损耗上,相比golang少了copy数据和释放旧空间。所以在高性能场景下,这里依然不建议使用vector的自动增长特性,自动增长的内存分配会消耗存储空间,而且 2N 的增长步长会很容易导致内存泄漏,你如果依赖这个自动增长特性,你将会发现你使用的内存可能会发生 2^n 指数级增长。这绝对会在大部分边界条件下导致你的程序迅速的发生OOM。

远离动态数组,远离bug。