初学rust,return与无分号结尾区别
Tag rust, return, on by view 151

在rust中,我们在一开始学习rust时,就被告诉,在函数结尾可以不用写return,只需要结尾的那个语句别写分号(;),那么结尾的这个语句计算的值将会作为这个函数的返回值返回。

#[test]
fn test_return_2() {
    println!("none semi: {}", none_semicolon());
}

fn none_semicolon() -> String {
    String::from("hello world")
}

正常输出

running 1 test
none semi: hello world
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

这时,我形成一个印象,“结尾语句不加分号,可以当return用”。还有如下的例子

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            val
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我可以看到y变量是bool类型,这时候我以为val这句没有加分号,所以它将值返回给语句块,从而赋值给y,但是,这时候,我改为下面这样的

#[test]
fn test_return_2() {
    println!("none semi: {}", test_return_1());
}

fn test_return_1() -> bool {
    let x = Option::Some(true);

    let y = match x {
        Some(val) => {
            println!("{:?}", val);
            return val;
        }
        None => false,
    };

    println!("will finish function, y: {:?}", y);

    false
}

我发现,return 处就结束了函数并返回。后续的println并没有执行。

running 1 test
true
none semi: true
test test_return_2 ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

总结,位于语句块或者函数体结尾无分号结尾的语句,并不等同于return,它处于语句块结尾时,代表将它的值赋值给语句块所代表的变量,处于函数体结尾时,代表将它的值赋值给函数体所代表的变量(函数返回变量);而return这个函数终止返回的指令,从来都跟无分号结尾的语句无关,而是函数体的反花括号 (}) 的出现所带来的行为。


初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 252

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。


初学rust,HashMap的clone
Tag rust, clone, hashmap, on by view 207

在rust中有个常用个方法clone,按字面意思就是克隆。这个函数的作用是对对象进行深度拷贝,生成的新对象与原对象相互独立。

很多常用的类型或者容器类型都支持clone,例如rust中的HashMap也支持clone,我们用一段代码实验一下。

#[test]
fn test_hash_map_clone() {
    let xx: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
    let mut mp = xx.lock().unwrap();
    mp.insert("hi".to_string(), "hello".to_string());
    println!("origin: {:?}", mp);
    let mut cp = mp.clone();
    cp.insert("k".to_string(), "v".to_string());
    println!("origin: {:?}", mp);
    println!("cp    : {:?}", cp);
}

输出

running 1 test
origin: {"hi": "hello"}
origin: {"hi": "hello"}
cp    : {"hi": "hello", "k": "v"}
test test_hash_map_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

上面的测试代码运行结果表示,修改克隆后的对象cp,源对象mp不会发生变化。

那么我们自己定义的类型如何才能支持clone呢?使用#[derive(Clone)]这个指令修饰自定义类型,就会自动支持clone,但是要注意,如果自定义类型结构体里,如果有字段类型不支持clone,将无法通过#[derive(Clone)]指令快速支持clone

自定义类型clone测试如下

#[derive(Debug, Clone)]
struct User {
    name: String,
    age: i32,
}

#[test]
fn test_struct_clone() {
    let mut u1 = User {
        name: "rex".to_string(),
        age: 1,
    };
    println!("origin: {:?}", u1);
    let mut ucp = u1.clone();
    ucp.name = "agnes".to_string();
    ucp.age = 2;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
    u1.age = 3;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
}

运行结果

running 1 test
origin: User { name: "rex", age: 1 }
origin: User { name: "rex", age: 1 }
cp    : User { name: "agnes", age: 2 }
origin: User { name: "rex", age: 3 }
cp    : User { name: "agnes", age: 2 }
test test_struct_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

同样可以看到,修改clone后的对象,源对象不变,修改源对象,clone后的对象也不变。


初学rust,如何在线程中调用成员方法
Tag rust, 线程, 方法, on by view 115

如何在线程中调用成员方法?

普通调用习惯写法

fn watch_receiver(mut self, rx: Receiver<String>) {
    thread::spawn(move || {
        for line in rx.iter() {
            self.push(line);
        }
    });
}

会报错

p.watch_receiver(rx);
    |           ------------------ `p` moved due to this method call
70  |         p.watch_poly();
    |         ^^^^^^^^^^^^^^ value borrowed here after move

这里需要把形参self改为指针&self,然后在方法体中克隆这个指针,就可以在方法中的线程里直接通过这个指针的克隆成员方法。

改为

fn watch_receiver(&self, rx: Receiver<String>) {
    let mut me = self.clone();
    thread::spawn(move || {
        for line in rx.iter() {
            me.push(line);
        }
    });
}

即可通过。

但是要注意,这里的clone,真的是克隆。所以clone前后的变量,即selfme是2个不同的变量,里面的成员也是在不同的内存空间上,修改self中的成员属性,me中对应的成员属性并不会跟着变。所以,如果里面的成员属性需要跟随变化,必须把成员属性定义为指针,这样修改指正所指的值,selfme中成员属性所指的值是相同的。


初学rust,使用协程
Tag rust, 协程, on by view 158

最近想使用rust写个tcp透明代理转发服务,这中间涉及到socket监听以及连接处理逻辑中连接后端服务并处理连接后的相关逻辑。

监听端口

fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                thread::spawn(move || {
                    // connection succeeded
                    handle_client(stream)
                });
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

处理客户端连接

fn handle_client(mut stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let mut client_stream = get_rs_connect_stream_by_ipport(x.to_string().as_str());
    println!("client stream: {:?}", client_stream);

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                // echo everything!
                println!("len: {:?}", size);
                stream.write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

可以看到这里每接受一个连接,就会起一个线程,如果在handle_client里再连接后端的话,就需要针对client_stream再起一个线程在线程中执行死循环,否则死循环会卡主stream这个死循环。这样一来,一个链接就得起2个线程了,那么连接一多,线程就更多了,一个进程能创建的线程数是有限的,因为线程对资源占用相对比较大,这样连接数一多,系统资源就不够用,性能就会很差。

对比到golang中的协程,我们能否在rust中使用协程呢?答案是肯定的。下面简单介绍一下rust中如何使用协程。rust中使用“协程”,我们用到tokio这个包。

对于单个异步,我们可以用async和await就可以了

#[tokio::main]
async fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                handle_client(stream).await  // 其中 handle_client 改为了 async 异步函数
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

其中 handle_client 是 async 异步函数,但是如果我想在handle_client中再起一个异步函数,直接使用async是不行的。我们可以这样处理,使用tokio::spawn就可以了,这里tokio::spawn有点类似golang中的go关键词,调用tokio::spawn的地方可以起一个异步函数。

async fn rs_handle(rs_stream: Arc<TcpStream>, stream: Arc<TcpStream>) {
    let mut client_data = [0 as u8; 50]; // using 50 byte buffer
    while match rs_stream.as_ref().read(&mut client_data) {
        Ok(client_size) => {
            if client_size == 0 {
                false
            } else {
                println!("len: {:?}", client_size);
                stream.as_ref().write(&client_data[0..client_size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!("client error occurred.");
            false
        }
    } {}
}

async fn handle_client(stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let stream = Arc::new(stream);
    let rs_stream = Arc::new(get_rs_connect_stream_by_ipport(x.to_string().as_str()));
    println!("rs stream: {:?}", rs_stream);

    let rsh_rs_stream = rs_stream.clone();
    let rsh_stream = stream.clone();
    tokio::spawn(rs_handle(rsh_rs_stream, rsh_stream)); // 这里相当于起一个线程
    println!("rs handle setted.");

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.as_ref().read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                rs_stream.as_ref().write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

如上代码,首先需要定义一个异步函数async fn rs_handle(...)然后在调用的地方使用tokio::spawn(rs_handle(...))来调用。就可以实现同等于golang中go关键词的效果,起一个协程。


初学rust,数组vector的自增
Tag rust, vector, 自增, on by view 76

在golang中经常会踩一个坑,那就是slice append,golang的动态数组也称为slice,使用append可以对动态数组进行添加元素,但是slice空间不够之后golang会自动重新分配内存空间,每次重新分配的内存空间是原空间的2倍,而且有个更坑的是,golang中slice每次重新分配内存都是重新分配一片 2N 大小的内存,然后把原来的数据拷贝过去,这样一来,性能损耗更大了。

那么rust中的数组是怎么处理动态增长的呢,我们来一段代码测试一下。

fn main() {
    let mut vec = Vec::with_capacity(100);

    // The vector contains no items, even though it has capacity for more
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // These are all done without reallocating...
    for i in 0..100 {
        vec.push(i);
    }
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // ...but this may make the vector reallocate
    vec.push(101);
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());
}

输出如下

vec.len: 0
vec.cap: 100
vec.len: 100
vec.cap: 100
vec.len: 101
vec.cap: 200

可以看到,rust中,当vector数组存储满了之后,再往里面添加元素,vector就会重新分配内存,新分配的内存也是原来空间的2倍,但是,他是在原来的内存上扩充的,而不是像golang一样重新分配一片2N的内存空间替换旧的内存。性能损耗上,相比golang少了copy数据和释放旧空间。所以在高性能场景下,这里依然不建议使用vector的自动增长特性,自动增长的内存分配会消耗存储空间,而且 2N 的增长步长会很容易导致内存泄漏,你如果依赖这个自动增长特性,你将会发现你使用的内存可能会发生 2^n 指数级增长。这绝对会在大部分边界条件下导致你的程序迅速的发生OOM。

远离动态数组,远离bug。


初学rust,优雅的解包Option
Tag rust, option, on by view 436

rust中,Option表示一个可能不存在值的复合类型。其定义如下

pub enum Option<T> {
    None,
    Some(T),
}

可以看到,它里面的值要么就是类型T的值,要么就是None(也表示不存在值)。通常,我们在获取到可能为空的值时,Option类型很有用,它要求你必须去处理可能为None的情况。它有方法 is_none, is_some 等,可以判定是否为空。但是如果你使用这两个方法来解包 Option,免不了if else判断,代码会比较难看。比如

let x: Option<u32> = Some(2);
let mut value: u32 = 0;
if x.is_none() {
    value = default;
} else {
    value = x.unwrap(); // unwrap 方法可以解一切包,但是遇到 None 会 panic
}

可以看到上面的代码用了2条语句,首先是初始化value值,然后判定是否为None,根据不同的情况,对value重新赋值,明显复杂,我解个Option还得分两步,而且申明的value还必需是mut类型。那么,对于“我需要解包Option,如果Option为None则给默认值”这个需求,有更优雅的写法吗?有,如下

let x: Option<u32> = Some(2);

// 方法一:利用 match,和语句块
let value = match x {
    Some(val) => val,
    None => default,
};

// 方法二:利用 if-let,和语句块
let value = {
    if let Some(val) = x {
        *val
    } else {
        default
    }
};

上述两种方法都是借助语句块一步到位的解包Option,并且没有调用任何方法。


初学rust,wasm前端图片转码
Tag wasm, rust, 转码, on by view 401

最近用rust写的日志上报agent趋近完善,意味着一个练习rust的小项目结束了。于是便找了个新的小项目,用rust代码编译出wasm,在浏览器端实现图片缩放、转码。决定做前端转码是出于两方面原因,第一是想体验一下rust-webassembly,第二是博客的管理后台上传图片能力有待优化,无法直接上传单反拍出来的图片,因为单反照都是十几兆以上大小,我的云服务器只有1M带宽,上传超时,就算我能忍受超时,也无法忍受大文件后端转码压缩时io满负载直接卡死服务器的情况。于是便有了这次wasm体验。

首先,如果你已经入门了rust,能用rust写代码了,那么用rust实现wasm将会是一种非常好的体验。因为rust的wasm全套工具齐全,你可以直接在rust项目中编译出npm包,编译出来的结果可以直接上传到npm仓库。这里简单介绍一下基于rust的wasm包开发过程。

首先创建rust的包项目,注意不是可执行文件。

cargo new wtools --lib

然后,修改Cargo.toml文件,定义包类型

[package]
name = "wtools"
version = "0.1.6"
edition = "2021"
description = "wasm tools"
license = "MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib", "rlib"] # cdylib 是wasm库, rlib 是常规rust库

[profile.release]
lto = true
opt-level = 'z'

[dependencies]

注意lib下的crate-type字段要定义为cdylib,只有这种包才能编译为wasm,然后还有一个选项需要注意profile.release下的lto=trueopt-level = 'z'这两个选项设置后,可以在编译的时候讲wasm压缩到最小大小,以减小wasm文件在网络中分发的大小。当然,缩减wasm还有个工具,叫做wasm-opt,但是我具体实测之后发现,只要设置了上面的ltoopt-level选项,这个工具能缩减的大小非常有限,有时候甚至无法再进一步缩减了。

安装工具。这里编译wasm报并不是用原生的cargo,而是使用一个叫做wasm-pack的工具,它的优点是,可以直接编译出npm包。安装

cargo install wasm-pack

编译

wasm-pack build --scope duguying

上传npm包

cd pkg
npm publish --access=public

整个开发的过程就是如上的那些。下面简单介绍一下代码。首先,我们这个rust项目的目标是编译为wasm在浏览器上运行。这里就免不了js与rust之间进行数据传递,以及rust里操作浏览器中的各种对象和元素。介绍两个rust包,第一个js-sys,用于js与rust之间进行数据传递的,在这个包里能找到js中的数据类型对应的类型;第二个web-sys,用于浏览器对象与rust之间进行数据传递的,在这个包里有对应浏览器中的各种对象。

比如,最常见的浏览器日志打印console.log,在web-sys中能找到console对象,详情可以查看文档。在我的rust包中简单的包装了一下

extern crate wasm_bindgen;
extern crate web_sys;

use wasm_bindgen::prelude::*;

#[macro_export]
macro_rules! console_log {
    ($($t:tt)*) => (web_sys::console::log(&js_sys::Array::of1(&JsValue::from(
        format_args!($($t)*).to_string()
    ))))
}

#[wasm_bindgen]
pub fn greet(name: &str) {
    console_log!("Hello, {}!", name);
}

这样就可以在其他地方用console_log来调用了,比如

console_log!("load img failed, err: {:?}", error);

我需要进行图片处理,所以用到了image这个包,这个包支持缩放图片resize、旋转图片rotate以及翻转图片flipv等。我主要用到缩放和旋转。另外有一点需要注意的是,需要导出到js的结构体和方法函数等,需要添加#[wasm_bindgen]注解。这个注解是在wasm_bindgen这个包中定义的,这个也是rust编译为wasm的核心包,具体可以查看文档。因为我发现单反上拍摄的照片通常会根据拍照者持相机的角度有一个旋转参数,而这个参数,它是存到了照片的exif信息中,但是他的照片数据实际存储是按照相机原始的方向存储的,所以,竖着拍摄的照片在上传到服务器之后会发现照片是横着的,需要旋转90度。所以在这里我还用到了kamadak-exif这个包,来读取照片的exif信息,从而获取旋转参数,然后根据旋转参数调用rotate对照片进行旋转来修正照片方向。图片处理的代码如下

extern crate wasm_bindgen;

use exif::{Error, Exif, In, Tag};
use image::{imageops::FilterType, DynamicImage, EncodableLayout, ImageFormat};
use js_sys::Uint8Array;
use std::io::{Cursor, Read, Seek, SeekFrom};
use wasm_bindgen::prelude::*;

use crate::console_log;

#[wasm_bindgen]
pub struct Img {
    img: DynamicImage,
    img_format: ImageFormat,
    exif: Result<Exif, Error>,
    orientation: u32,
}

#[wasm_bindgen]
impl Img {
    #[wasm_bindgen(constructor)]
    pub fn new(img: &[u8], mime: &str) -> Img {
        let exifreader = exif::Reader::new();
        let (img_data, img_format) = Img::load_image_from_array(img, mime.to_string());
        let mut c = Cursor::new(Vec::from(img));
        let exif = exifreader.read_from_container(&mut c);

        let mut image = Img {
            img: img_data,
            img_format: img_format,
            exif: exif,
            orientation: 0,
        };
        image.get_orietation();
        image.fix_orietation();
        image
    }

    fn load_image_from_array(_array: &[u8], mime: String) -> (DynamicImage, ImageFormat) {
        let img_format = ImageFormat::from_mime_type(mime).unwrap();
        let img = match image::load_from_memory_with_format(_array, img_format) {
            Ok(img) => img,
            Err(error) => {
                console_log!("load img failed, err: {:?}", error);
                panic!("{:?}", error)
            }
        };
        return (img, img_format);
    }

    fn get_orietation(&mut self) {
        match &self.exif {
            Ok(exif) => {
                let r = exif.get_field(Tag::Orientation, In::PRIMARY);
                match r {
                    Some(oriet) => {
                        self.orientation = oriet.value.get_uint(0).unwrap();
                    }
                    None => {}
                }
                console_log!("orientation: {:?}", r.unwrap());
            }
            Err(_error) => {}
        };
    }

    fn fix_orietation(&mut self) {
        match self.orientation {
            8 => self.img = self.img.rotate270(),
            3 => self.img = self.img.rotate180(),
            6 => self.img = self.img.rotate90(),
            _ => {}
        }
    }

    fn image_to_uint8_array(&self, img: DynamicImage) -> Uint8Array {
        // 创建一个内存空间
        let mut c = Cursor::new(Vec::new());
        match img.write_to(&mut c, self.img_format) {
            Ok(c) => c,
            Err(error) => {
                panic!(
                    "There was a problem writing the resulting buffer: {:?}",
                    error
                )
            }
        };
        c.seek(SeekFrom::Start(0)).unwrap();
        let mut out = Vec::new();
        // 从内存读取数据
        c.read_to_end(&mut out).unwrap();
        let v = out.as_bytes();
        Uint8Array::from(v)
    }

    pub fn get_width(&self) -> u32 {
        return self.img.width();
    }

    pub fn get_height(&self) -> u32 {
        return self.img.height();
    }

    pub fn grayscale(&self) -> Uint8Array {
        let img = self.img.grayscale();
        self.image_to_uint8_array(img)
    }

    pub fn scale(&self, width: u32, height: u32) -> Uint8Array {
        let img = self.img.resize(width, height, FilterType::Triangle);
        self.image_to_uint8_array(img)
    }

    pub fn rotate90(&self) -> Uint8Array {
        let img = self.img.rotate90();
        self.image_to_uint8_array(img)
    }

    pub fn rotate180(&self) -> Uint8Array {
        let img = self.img.rotate180();
        self.image_to_uint8_array(img)
    }

    pub fn rotate270(&self) -> Uint8Array {
        let img = self.img.rotate270();
        self.image_to_uint8_array(img)
    }

    pub fn flipv(&self) -> Uint8Array {
        let img = self.img.flipv();
        self.image_to_uint8_array(img)
    }

    pub fn fliph(&self) -> Uint8Array {
        let img = self.img.fliph();
        self.image_to_uint8_array(img)
    }
}

编译成功打包上传npm仓库之后,在前端项目中使用有一点需要注意,像这种基于wasm的npm包并不能像常规的npm包那样直接import引入,而是需要异步引入,这种写法非常不优雅,如下

/**
 * @description 全局注册md5工具
 */
async function waitwasm () {
  const { Crypt, Img } = await import('@duguying/wtools')
  Vue.prototype.$md5 = (content) => {
    let crypt = new Crypt()
    let out = crypt.md5(content)
    crypt.free()
    return out
  }
  Vue.prototype.$scale_img = (file) => {
    return new Promise(function (resolve, reject) {
      let reader = new FileReader()
      reader.readAsArrayBuffer(file)
      reader.onload = function () {
        let data = new Uint8Array(this.result)
        console.log('data:', data)
        let kit = new Img(data, file.type)
        console.log(kit)
        let w = kit.get_width()
        let h = kit.get_width()
        console.log('wh:', w, h)
        if (w > 2000) {
          w = 2000
          h = h / w * 2000
        } else {
          resolve(file)
          return
        }
        let out = kit.scale(w, h)
        resolve(new Blob([out.buffer], { type: file.type }))
      }
    })
  }
}
(async () => {
  waitwasm()
})()

他本身是一个异步引入,但是需要等它引入完毕之后,才能调用其中的方法,否则就会报错,所以,这里只好同步阻塞,等他引入完毕了。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 539

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 279

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。