初学rust,如何在线程中调用成员方法
Tag rust, 线程, 方法, on by view 104

如何在线程中调用成员方法?

普通调用习惯写法

fn watch_receiver(mut self, rx: Receiver<String>) {
    thread::spawn(move || {
        for line in rx.iter() {
            self.push(line);
        }
    });
}

会报错

p.watch_receiver(rx);
    |           ------------------ `p` moved due to this method call
70  |         p.watch_poly();
    |         ^^^^^^^^^^^^^^ value borrowed here after move

这里需要把形参self改为指针&self,然后在方法体中克隆这个指针,就可以在方法中的线程里直接通过这个指针的克隆成员方法。

改为

fn watch_receiver(&self, rx: Receiver<String>) {
    let mut me = self.clone();
    thread::spawn(move || {
        for line in rx.iter() {
            me.push(line);
        }
    });
}

即可通过。

但是要注意,这里的clone,真的是克隆。所以clone前后的变量,即selfme是2个不同的变量,里面的成员也是在不同的内存空间上,修改self中的成员属性,me中对应的成员属性并不会跟着变。所以,如果里面的成员属性需要跟随变化,必须把成员属性定义为指针,这样修改指正所指的值,selfme中成员属性所指的值是相同的。


初学rust,使用协程
Tag rust, 协程, on by view 144

最近想使用rust写个tcp透明代理转发服务,这中间涉及到socket监听以及连接处理逻辑中连接后端服务并处理连接后的相关逻辑。

监听端口

fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                thread::spawn(move || {
                    // connection succeeded
                    handle_client(stream)
                });
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

处理客户端连接

fn handle_client(mut stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let mut client_stream = get_rs_connect_stream_by_ipport(x.to_string().as_str());
    println!("client stream: {:?}", client_stream);

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                // echo everything!
                println!("len: {:?}", size);
                stream.write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

可以看到这里每接受一个连接,就会起一个线程,如果在handle_client里再连接后端的话,就需要针对client_stream再起一个线程在线程中执行死循环,否则死循环会卡主stream这个死循环。这样一来,一个链接就得起2个线程了,那么连接一多,线程就更多了,一个进程能创建的线程数是有限的,因为线程对资源占用相对比较大,这样连接数一多,系统资源就不够用,性能就会很差。

对比到golang中的协程,我们能否在rust中使用协程呢?答案是肯定的。下面简单介绍一下rust中如何使用协程。rust中使用“协程”,我们用到tokio这个包。

对于单个异步,我们可以用async和await就可以了

#[tokio::main]
async fn main() {
    let skt = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
    let address: SocketAddr = "0.0.0.0:3333".parse().unwrap();
    skt.set_ip_transparent(true).unwrap();
    skt.set_reuse_address(true).unwrap();
    skt.set_reuse_port(true).unwrap();
    skt.bind(&address.into()).unwrap();
    skt.listen(128).unwrap();

    let listener: TcpListener = skt.into();
    println!("Server listening on port 3333");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("peer addr: {}", stream.peer_addr().unwrap());
                handle_client(stream).await  // 其中 handle_client 改为了 async 异步函数
            }
            Err(e) => {
                println!("Error: {}", e);
                /* connection failed */
            }
        }
    }
    // close the socket server
    drop(listener);
}

其中 handle_client 是 async 异步函数,但是如果我想在handle_client中再起一个异步函数,直接使用async是不行的。我们可以这样处理,使用tokio::spawn就可以了,这里tokio::spawn有点类似golang中的go关键词,调用tokio::spawn的地方可以起一个异步函数。

async fn rs_handle(rs_stream: Arc<TcpStream>, stream: Arc<TcpStream>) {
    let mut client_data = [0 as u8; 50]; // using 50 byte buffer
    while match rs_stream.as_ref().read(&mut client_data) {
        Ok(client_size) => {
            if client_size == 0 {
                false
            } else {
                println!("len: {:?}", client_size);
                stream.as_ref().write(&client_data[0..client_size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!("client error occurred.");
            false
        }
    } {}
}

async fn handle_client(stream: TcpStream) {
    let x = get_original_destination_addr(&stream).unwrap();
    println!("target addr: {:?}", x);

    let stream = Arc::new(stream);
    let rs_stream = Arc::new(get_rs_connect_stream_by_ipport(x.to_string().as_str()));
    println!("rs stream: {:?}", rs_stream);

    let rsh_rs_stream = rs_stream.clone();
    let rsh_stream = stream.clone();
    tokio::spawn(rs_handle(rsh_rs_stream, rsh_stream)); // 这里相当于起一个线程
    println!("rs handle setted.");

    let mut data = [0 as u8; 50]; // using 50 byte buffer
    while match stream.as_ref().read(&mut data) {
        Ok(size) => {
            if size == 0 {
                false
            } else {
                rs_stream.as_ref().write(&data[0..size]).unwrap();
                true
            }
        }
        Err(_) => {
            println!(
                "An error occurred, terminating connection with {}",
                stream.peer_addr().unwrap()
            );
            stream.shutdown(Shutdown::Both).unwrap();
            false
        }
    } {}
}

如上代码,首先需要定义一个异步函数async fn rs_handle(...)然后在调用的地方使用tokio::spawn(rs_handle(...))来调用。就可以实现同等于golang中go关键词的效果,起一个协程。


初学rust,数组vector的自增
Tag rust, vector, 自增, on by view 51

在golang中经常会踩一个坑,那就是slice append,golang的动态数组也称为slice,使用append可以对动态数组进行添加元素,但是slice空间不够之后golang会自动重新分配内存空间,每次重新分配的内存空间是原空间的2倍,而且有个更坑的是,golang中slice每次重新分配内存都是重新分配一片 2N 大小的内存,然后把原来的数据拷贝过去,这样一来,性能损耗更大了。

那么rust中的数组是怎么处理动态增长的呢,我们来一段代码测试一下。

fn main() {
    let mut vec = Vec::with_capacity(100);

    // The vector contains no items, even though it has capacity for more
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // These are all done without reallocating...
    for i in 0..100 {
        vec.push(i);
    }
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());

    // ...but this may make the vector reallocate
    vec.push(101);
    println!("vec.len: {:?}", vec.len());
    println!("vec.cap: {:?}", vec.capacity());
}

输出如下

vec.len: 0
vec.cap: 100
vec.len: 100
vec.cap: 100
vec.len: 101
vec.cap: 200

可以看到,rust中,当vector数组存储满了之后,再往里面添加元素,vector就会重新分配内存,新分配的内存也是原来空间的2倍,但是,他是在原来的内存上扩充的,而不是像golang一样重新分配一片2N的内存空间替换旧的内存。性能损耗上,相比golang少了copy数据和释放旧空间。所以在高性能场景下,这里依然不建议使用vector的自动增长特性,自动增长的内存分配会消耗存储空间,而且 2N 的增长步长会很容易导致内存泄漏,你如果依赖这个自动增长特性,你将会发现你使用的内存可能会发生 2^n 指数级增长。这绝对会在大部分边界条件下导致你的程序迅速的发生OOM。

远离动态数组,远离bug。


初学rust,优雅的解包Option
Tag rust, option, on by view 349

rust中,Option表示一个可能不存在值的复合类型。其定义如下

pub enum Option<T> {
    None,
    Some(T),
}

可以看到,它里面的值要么就是类型T的值,要么就是None(也表示不存在值)。通常,我们在获取到可能为空的值时,Option类型很有用,它要求你必须去处理可能为None的情况。它有方法 is_none, is_some 等,可以判定是否为空。但是如果你使用这两个方法来解包 Option,免不了if else判断,代码会比较难看。比如

let x: Option<u32> = Some(2);
let mut value: u32 = 0;
if x.is_none() {
    value = default;
} else {
    value = x.unwrap(); // unwrap 方法可以解一切包,但是遇到 None 会 panic
}

可以看到上面的代码用了2条语句,首先是初始化value值,然后判定是否为None,根据不同的情况,对value重新赋值,明显复杂,我解个Option还得分两步,而且申明的value还必需是mut类型。那么,对于“我需要解包Option,如果Option为None则给默认值”这个需求,有更优雅的写法吗?有,如下

let x: Option<u32> = Some(2);

// 方法一:利用 match,和语句块
let value = match x {
    Some(val) => val,
    None => default,
};

// 方法二:利用 if-let,和语句块
let value = {
    if let Some(val) = x {
        *val
    } else {
        default
    }
};

上述两种方法都是借助语句块一步到位的解包Option,并且没有调用任何方法。


初学rust,wasm前端图片转码
Tag wasm, rust, 转码, on by view 379

最近用rust写的日志上报agent趋近完善,意味着一个练习rust的小项目结束了。于是便找了个新的小项目,用rust代码编译出wasm,在浏览器端实现图片缩放、转码。决定做前端转码是出于两方面原因,第一是想体验一下rust-webassembly,第二是博客的管理后台上传图片能力有待优化,无法直接上传单反拍出来的图片,因为单反照都是十几兆以上大小,我的云服务器只有1M带宽,上传超时,就算我能忍受超时,也无法忍受大文件后端转码压缩时io满负载直接卡死服务器的情况。于是便有了这次wasm体验。

首先,如果你已经入门了rust,能用rust写代码了,那么用rust实现wasm将会是一种非常好的体验。因为rust的wasm全套工具齐全,你可以直接在rust项目中编译出npm包,编译出来的结果可以直接上传到npm仓库。这里简单介绍一下基于rust的wasm包开发过程。

首先创建rust的包项目,注意不是可执行文件。

cargo new wtools --lib

然后,修改Cargo.toml文件,定义包类型

[package]
name = "wtools"
version = "0.1.6"
edition = "2021"
description = "wasm tools"
license = "MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib", "rlib"] # cdylib 是wasm库, rlib 是常规rust库

[profile.release]
lto = true
opt-level = 'z'

[dependencies]

注意lib下的crate-type字段要定义为cdylib,只有这种包才能编译为wasm,然后还有一个选项需要注意profile.release下的lto=trueopt-level = 'z'这两个选项设置后,可以在编译的时候讲wasm压缩到最小大小,以减小wasm文件在网络中分发的大小。当然,缩减wasm还有个工具,叫做wasm-opt,但是我具体实测之后发现,只要设置了上面的ltoopt-level选项,这个工具能缩减的大小非常有限,有时候甚至无法再进一步缩减了。

安装工具。这里编译wasm报并不是用原生的cargo,而是使用一个叫做wasm-pack的工具,它的优点是,可以直接编译出npm包。安装

cargo install wasm-pack

编译

wasm-pack build --scope duguying

上传npm包

cd pkg
npm publish --access=public

整个开发的过程就是如上的那些。下面简单介绍一下代码。首先,我们这个rust项目的目标是编译为wasm在浏览器上运行。这里就免不了js与rust之间进行数据传递,以及rust里操作浏览器中的各种对象和元素。介绍两个rust包,第一个js-sys,用于js与rust之间进行数据传递的,在这个包里能找到js中的数据类型对应的类型;第二个web-sys,用于浏览器对象与rust之间进行数据传递的,在这个包里有对应浏览器中的各种对象。

比如,最常见的浏览器日志打印console.log,在web-sys中能找到console对象,详情可以查看文档。在我的rust包中简单的包装了一下

extern crate wasm_bindgen;
extern crate web_sys;

use wasm_bindgen::prelude::*;

#[macro_export]
macro_rules! console_log {
    ($($t:tt)*) => (web_sys::console::log(&js_sys::Array::of1(&JsValue::from(
        format_args!($($t)*).to_string()
    ))))
}

#[wasm_bindgen]
pub fn greet(name: &str) {
    console_log!("Hello, {}!", name);
}

这样就可以在其他地方用console_log来调用了,比如

console_log!("load img failed, err: {:?}", error);

我需要进行图片处理,所以用到了image这个包,这个包支持缩放图片resize、旋转图片rotate以及翻转图片flipv等。我主要用到缩放和旋转。另外有一点需要注意的是,需要导出到js的结构体和方法函数等,需要添加#[wasm_bindgen]注解。这个注解是在wasm_bindgen这个包中定义的,这个也是rust编译为wasm的核心包,具体可以查看文档。因为我发现单反上拍摄的照片通常会根据拍照者持相机的角度有一个旋转参数,而这个参数,它是存到了照片的exif信息中,但是他的照片数据实际存储是按照相机原始的方向存储的,所以,竖着拍摄的照片在上传到服务器之后会发现照片是横着的,需要旋转90度。所以在这里我还用到了kamadak-exif这个包,来读取照片的exif信息,从而获取旋转参数,然后根据旋转参数调用rotate对照片进行旋转来修正照片方向。图片处理的代码如下

extern crate wasm_bindgen;

use exif::{Error, Exif, In, Tag};
use image::{imageops::FilterType, DynamicImage, EncodableLayout, ImageFormat};
use js_sys::Uint8Array;
use std::io::{Cursor, Read, Seek, SeekFrom};
use wasm_bindgen::prelude::*;

use crate::console_log;

#[wasm_bindgen]
pub struct Img {
    img: DynamicImage,
    img_format: ImageFormat,
    exif: Result<Exif, Error>,
    orientation: u32,
}

#[wasm_bindgen]
impl Img {
    #[wasm_bindgen(constructor)]
    pub fn new(img: &[u8], mime: &str) -> Img {
        let exifreader = exif::Reader::new();
        let (img_data, img_format) = Img::load_image_from_array(img, mime.to_string());
        let mut c = Cursor::new(Vec::from(img));
        let exif = exifreader.read_from_container(&mut c);

        let mut image = Img {
            img: img_data,
            img_format: img_format,
            exif: exif,
            orientation: 0,
        };
        image.get_orietation();
        image.fix_orietation();
        image
    }

    fn load_image_from_array(_array: &[u8], mime: String) -> (DynamicImage, ImageFormat) {
        let img_format = ImageFormat::from_mime_type(mime).unwrap();
        let img = match image::load_from_memory_with_format(_array, img_format) {
            Ok(img) => img,
            Err(error) => {
                console_log!("load img failed, err: {:?}", error);
                panic!("{:?}", error)
            }
        };
        return (img, img_format);
    }

    fn get_orietation(&mut self) {
        match &self.exif {
            Ok(exif) => {
                let r = exif.get_field(Tag::Orientation, In::PRIMARY);
                match r {
                    Some(oriet) => {
                        self.orientation = oriet.value.get_uint(0).unwrap();
                    }
                    None => {}
                }
                console_log!("orientation: {:?}", r.unwrap());
            }
            Err(_error) => {}
        };
    }

    fn fix_orietation(&mut self) {
        match self.orientation {
            8 => self.img = self.img.rotate270(),
            3 => self.img = self.img.rotate180(),
            6 => self.img = self.img.rotate90(),
            _ => {}
        }
    }

    fn image_to_uint8_array(&self, img: DynamicImage) -> Uint8Array {
        // 创建一个内存空间
        let mut c = Cursor::new(Vec::new());
        match img.write_to(&mut c, self.img_format) {
            Ok(c) => c,
            Err(error) => {
                panic!(
                    "There was a problem writing the resulting buffer: {:?}",
                    error
                )
            }
        };
        c.seek(SeekFrom::Start(0)).unwrap();
        let mut out = Vec::new();
        // 从内存读取数据
        c.read_to_end(&mut out).unwrap();
        let v = out.as_bytes();
        Uint8Array::from(v)
    }

    pub fn get_width(&self) -> u32 {
        return self.img.width();
    }

    pub fn get_height(&self) -> u32 {
        return self.img.height();
    }

    pub fn grayscale(&self) -> Uint8Array {
        let img = self.img.grayscale();
        self.image_to_uint8_array(img)
    }

    pub fn scale(&self, width: u32, height: u32) -> Uint8Array {
        let img = self.img.resize(width, height, FilterType::Triangle);
        self.image_to_uint8_array(img)
    }

    pub fn rotate90(&self) -> Uint8Array {
        let img = self.img.rotate90();
        self.image_to_uint8_array(img)
    }

    pub fn rotate180(&self) -> Uint8Array {
        let img = self.img.rotate180();
        self.image_to_uint8_array(img)
    }

    pub fn rotate270(&self) -> Uint8Array {
        let img = self.img.rotate270();
        self.image_to_uint8_array(img)
    }

    pub fn flipv(&self) -> Uint8Array {
        let img = self.img.flipv();
        self.image_to_uint8_array(img)
    }

    pub fn fliph(&self) -> Uint8Array {
        let img = self.img.fliph();
        self.image_to_uint8_array(img)
    }
}

编译成功打包上传npm仓库之后,在前端项目中使用有一点需要注意,像这种基于wasm的npm包并不能像常规的npm包那样直接import引入,而是需要异步引入,这种写法非常不优雅,如下

/**
 * @description 全局注册md5工具
 */
async function waitwasm () {
  const { Crypt, Img } = await import('@duguying/wtools')
  Vue.prototype.$md5 = (content) => {
    let crypt = new Crypt()
    let out = crypt.md5(content)
    crypt.free()
    return out
  }
  Vue.prototype.$scale_img = (file) => {
    return new Promise(function (resolve, reject) {
      let reader = new FileReader()
      reader.readAsArrayBuffer(file)
      reader.onload = function () {
        let data = new Uint8Array(this.result)
        console.log('data:', data)
        let kit = new Img(data, file.type)
        console.log(kit)
        let w = kit.get_width()
        let h = kit.get_width()
        console.log('wh:', w, h)
        if (w > 2000) {
          w = 2000
          h = h / w * 2000
        } else {
          resolve(file)
          return
        }
        let out = kit.scale(w, h)
        resolve(new Blob([out.buffer], { type: file.type }))
      }
    })
  }
}
(async () => {
  waitwasm()
})()

他本身是一个异步引入,但是需要等它引入完毕之后,才能调用其中的方法,否则就会报错,所以,这里只好同步阻塞,等他引入完毕了。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 485

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 272

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。


初学rust,闭包引用外部变量
Tag rust, 闭包, on by view 299

今天又遇到一个新的问题,那就是在闭包函数中,无法引用闭包函数之外的变量。

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    // println!("{}",filename);
    // let mut p = poly::Poly::new();
    let mut s = String::from("_");
    let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();

    log_watcher.watch(|line: String| {
        let (_, token) = parse::lex::parser(&line);
        // p.push(LineStat {});
        print!("{}", s);
        process::report::send(token);
    });
}

上面代码中,watch传入参数为一个闭包函数,其中需要引用main函数中的局部变量s,但是会报错

error[E0308]: mismatched types
  --> src/main.rs:22:23
   |
22 |       log_watcher.watch(|line: String| {
   |  _______________________^
23 | |         let (_, token) = parse::lex::parser(&line);
24 | |         // p.push(LineStat {});
25 | |         print!("{}", s);
26 | |         process::report::send(token);
27 | |     });
   | |_____^ expected fn pointer, found closure
   |
   = note: expected fn pointer `fn(std::string::String)`
                 found closure `[closure@src/main.rs:22:23: 27:6]`
note: closures can only be coerced to `fn` types if they do not capture any variables

查看logwatcher源码,watch该版本(0.1.0)方法定义如下

pub fn watch(&mut self, callback: fn (line: String)) {
    ....
}

不过我后来发现logwatcher版本升级到(0.1.1)之后,该方法变为

pub fn watch<F: ?Sized>(&mut self, callback: &mut F)
where
    F: FnMut(String) -> LogWatcherAction,
{
    ....
}

如此可以正常调用,代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let mut poly: Poly = Poly::new();

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

由此可见,闭包与函数还是有区别的。但是回调函数定义为Fn/FnMut类型就可以同时接受闭包和函数类型的回调参数。举例如下

let a = String::from("abc");

let mut x = || println!("x {}", a);

fn y() {
    println!("y")
}

fn wrap(c: &mut Fn()) {
    c()
}

wrap(&mut x); // pass a closure
wrap(&mut y); // pass a function

同样可以参考 stackoverflow 的这个提问


初学rust,为什么对象不能被函数返回
Tag rust, on by view 238

7年前,我选择了自学golang,事实证明我选择对了。目前业界用golang的人越来越多,也越来越卷。于是我就想选择另一门语言了。用了这些年的golang之后,感觉golang在很多场景下还是很难胜任高新能开发;另外,我特别羡慕能做底层开发的同学,比如内核开发、协议栈开发等;因此,我看上了rust,rust号称无gc,甚至还能开发linux内核,还有一些游戏引擎,数据库引擎也是用rust开发的,一听就特别高端是吧。

于是我作为一个新手,开始尝试上手rust,原本准备依照golang的学习路线,先写个博客或者小的web程序,但是,发现rust上手太难。于是降低了上手的要求,用rust重写了一遍我的pinyin项目,项目是将汉字翻译成拼音,支持多音词,需要接入字词库。折腾了许久,终于完成了;这里简单谈一下作为一个rust初学者的感受。

xtyiguc3

首先,最大一个问题就是局部变量无法在作用域之外使用,这在golang上会自动扩大作用域,无需考虑这些问题。比如,我在一个函数中创建出来的对象,return返回后,发现无法在外部访问,直接报错,实际上该对象在函数结束时已经被销毁。或者说,所在对象发生move,因为他的类型未实现Copy trait。

79dsu2ub

比如,这种报错经常发生在这么一个场景,我创建了一个config包,辛苦的在里面实现了加载配置文件到一个结构体,当我完成这些之后,希望在另一个mod中引用这个config结构体,便会经常发生这种情况。

use std::fs::File;
use std::io::prelude::*;
use once_cell::sync::Lazy;

#[derive(Deserialize)]
#[derive(Debug)]
struct LasConfig {
    pub access_log: Option<String>,
}

#[derive(Deserialize)]
#[derive(Debug)]
struct Conf {
    pub las: Option<LasConfig>
}

static global: Lazy<Conf> = Lazy::new(|| {
    let file_path = "config.toml";
    let mut file = match File::open(file_path) {
        Ok(f) => f,
        Err(e) => panic!("no such file {} exception:{}", file_path, e)
    };
    let mut str_val = String::new();
    match file.read_to_string(&mut str_val) {
        Ok(s) => s,
        Err(e) => panic!("Error Reading file: {}", e)
    };
    toml::from_str(&str_val).unwrap()
});

pub fn get_config() -> Conf {
    *global
}

其中*global的类型的确是Conf但是却无法通过参数返回。会有如下的报错信息

error[E0507]: cannot move out of dereference of `once_cell::sync::Lazy<Conf>`
  --> src/config/cfg.rs:32:5
   |
32 |     *global
   |     ^^^^^^^ move occurs because value has type `Conf`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.

e463qi5x

很明显,这里生成的Conf类型的global对象,但是无法直接返回。因为返回的时候发生了move

神奇的是,这里get_config()改成如下这样,就不会有报错了

pub fn get_config() -> &'static Conf {
   (*global).borrow()
}

我们从E0507这篇文章中可以看到

the nothing_is_true method takes the ownership of self

那么我们这里的报错应该也是*global在返回的时候,将ownership拿走,既然不能拿走,那就改为借用好了。