早年买了Nikon D7200,放了很久没怎么用,上次去中山带上了单反,感觉拍照效果很好。这一次学会了手动对焦。于是买了个近摄镜,想体验一下微距摄影,拍一下花花草草和昆虫蚂蚁的细节。
上面这些照片都是配合近摄镜手动对焦拍摄的。
早年买了Nikon D7200,放了很久没怎么用,上次去中山带上了单反,感觉拍照效果很好。这一次学会了手动对焦。于是买了个近摄镜,想体验一下微距摄影,拍一下花花草草和昆虫蚂蚁的细节。
上面这些照片都是配合近摄镜手动对焦拍摄的。
最近用rust写的日志上报agent趋近完善,意味着一个练习rust的小项目结束了。于是便找了个新的小项目,用rust代码编译出wasm,在浏览器端实现图片缩放、转码。决定做前端转码是出于两方面原因,第一是想体验一下rust-webassembly,第二是博客的管理后台上传图片能力有待优化,无法直接上传单反拍出来的图片,因为单反照都是十几兆以上大小,我的云服务器只有1M带宽,上传超时,就算我能忍受超时,也无法忍受大文件后端转码压缩时io满负载直接卡死服务器的情况。于是便有了这次wasm体验。
首先,如果你已经入门了rust,能用rust写代码了,那么用rust实现wasm将会是一种非常好的体验。因为rust的wasm全套工具齐全,你可以直接在rust项目中编译出npm包,编译出来的结果可以直接上传到npm仓库。这里简单介绍一下基于rust的wasm包开发过程。
首先创建rust的包项目,注意不是可执行文件。
cargo new wtools --lib
然后,修改Cargo.toml
文件,定义包类型
[package]
name = "wtools"
version = "0.1.6"
edition = "2021"
description = "wasm tools"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib", "rlib"] # cdylib 是wasm库, rlib 是常规rust库
[profile.release]
lto = true
opt-level = 'z'
[dependencies]
注意lib
下的crate-type字段要定义为cdylib
,只有这种包才能编译为wasm,然后还有一个选项需要注意profile.release
下的lto=true
和opt-level = 'z'
这两个选项设置后,可以在编译的时候讲wasm压缩到最小大小,以减小wasm文件在网络中分发的大小。当然,缩减wasm还有个工具,叫做wasm-opt
,但是我具体实测之后发现,只要设置了上面的lto
和opt-level
选项,这个工具能缩减的大小非常有限,有时候甚至无法再进一步缩减了。
安装工具。这里编译wasm
报并不是用原生的cargo,而是使用一个叫做wasm-pack
的工具,它的优点是,可以直接编译出npm包。安装
cargo install wasm-pack
编译
wasm-pack build --scope duguying
上传npm包
cd pkg
npm publish --access=public
整个开发的过程就是如上的那些。下面简单介绍一下代码。首先,我们这个rust项目的目标是编译为wasm在浏览器上运行。这里就免不了js与rust之间进行数据传递,以及rust里操作浏览器中的各种对象和元素。介绍两个rust包,第一个js-sys
,用于js与rust之间进行数据传递的,在这个包里能找到js中的数据类型对应的类型;第二个web-sys
,用于浏览器对象与rust之间进行数据传递的,在这个包里有对应浏览器中的各种对象。
比如,最常见的浏览器日志打印console.log
,在web-sys
中能找到console
对象,详情可以查看文档。在我的rust包中简单的包装了一下
extern crate wasm_bindgen;
extern crate web_sys;
use wasm_bindgen::prelude::*;
#[macro_export]
macro_rules! console_log {
($($t:tt)*) => (web_sys::console::log(&js_sys::Array::of1(&JsValue::from(
format_args!($($t)*).to_string()
))))
}
#[wasm_bindgen]
pub fn greet(name: &str) {
console_log!("Hello, {}!", name);
}
这样就可以在其他地方用console_log
来调用了,比如
console_log!("load img failed, err: {:?}", error);
我需要进行图片处理,所以用到了image
这个包,这个包支持缩放图片resize
、旋转图片rotate
以及翻转图片flipv
等。我主要用到缩放和旋转。另外有一点需要注意的是,需要导出到js的结构体和方法函数等,需要添加#[wasm_bindgen]
注解。这个注解是在wasm_bindgen
这个包中定义的,这个也是rust编译为wasm的核心包,具体可以查看文档。因为我发现单反上拍摄的照片通常会根据拍照者持相机的角度有一个旋转参数,而这个参数,它是存到了照片的exif信息中,但是他的照片数据实际存储是按照相机原始的方向存储的,所以,竖着拍摄的照片在上传到服务器之后会发现照片是横着的,需要旋转90度。所以在这里我还用到了kamadak-exif
这个包,来读取照片的exif信息,从而获取旋转参数,然后根据旋转参数调用rotate
对照片进行旋转来修正照片方向。图片处理的代码如下
extern crate wasm_bindgen;
use exif::{Error, Exif, In, Tag};
use image::{imageops::FilterType, DynamicImage, EncodableLayout, ImageFormat};
use js_sys::Uint8Array;
use std::io::{Cursor, Read, Seek, SeekFrom};
use wasm_bindgen::prelude::*;
use crate::console_log;
#[wasm_bindgen]
pub struct Img {
img: DynamicImage,
img_format: ImageFormat,
exif: Result<Exif, Error>,
orientation: u32,
}
#[wasm_bindgen]
impl Img {
#[wasm_bindgen(constructor)]
pub fn new(img: &[u8], mime: &str) -> Img {
let exifreader = exif::Reader::new();
let (img_data, img_format) = Img::load_image_from_array(img, mime.to_string());
let mut c = Cursor::new(Vec::from(img));
let exif = exifreader.read_from_container(&mut c);
let mut image = Img {
img: img_data,
img_format: img_format,
exif: exif,
orientation: 0,
};
image.get_orietation();
image.fix_orietation();
image
}
fn load_image_from_array(_array: &[u8], mime: String) -> (DynamicImage, ImageFormat) {
let img_format = ImageFormat::from_mime_type(mime).unwrap();
let img = match image::load_from_memory_with_format(_array, img_format) {
Ok(img) => img,
Err(error) => {
console_log!("load img failed, err: {:?}", error);
panic!("{:?}", error)
}
};
return (img, img_format);
}
fn get_orietation(&mut self) {
match &self.exif {
Ok(exif) => {
let r = exif.get_field(Tag::Orientation, In::PRIMARY);
match r {
Some(oriet) => {
self.orientation = oriet.value.get_uint(0).unwrap();
}
None => {}
}
console_log!("orientation: {:?}", r.unwrap());
}
Err(_error) => {}
};
}
fn fix_orietation(&mut self) {
match self.orientation {
8 => self.img = self.img.rotate270(),
3 => self.img = self.img.rotate180(),
6 => self.img = self.img.rotate90(),
_ => {}
}
}
fn image_to_uint8_array(&self, img: DynamicImage) -> Uint8Array {
// 创建一个内存空间
let mut c = Cursor::new(Vec::new());
match img.write_to(&mut c, self.img_format) {
Ok(c) => c,
Err(error) => {
panic!(
"There was a problem writing the resulting buffer: {:?}",
error
)
}
};
c.seek(SeekFrom::Start(0)).unwrap();
let mut out = Vec::new();
// 从内存读取数据
c.read_to_end(&mut out).unwrap();
let v = out.as_bytes();
Uint8Array::from(v)
}
pub fn get_width(&self) -> u32 {
return self.img.width();
}
pub fn get_height(&self) -> u32 {
return self.img.height();
}
pub fn grayscale(&self) -> Uint8Array {
let img = self.img.grayscale();
self.image_to_uint8_array(img)
}
pub fn scale(&self, width: u32, height: u32) -> Uint8Array {
let img = self.img.resize(width, height, FilterType::Triangle);
self.image_to_uint8_array(img)
}
pub fn rotate90(&self) -> Uint8Array {
let img = self.img.rotate90();
self.image_to_uint8_array(img)
}
pub fn rotate180(&self) -> Uint8Array {
let img = self.img.rotate180();
self.image_to_uint8_array(img)
}
pub fn rotate270(&self) -> Uint8Array {
let img = self.img.rotate270();
self.image_to_uint8_array(img)
}
pub fn flipv(&self) -> Uint8Array {
let img = self.img.flipv();
self.image_to_uint8_array(img)
}
pub fn fliph(&self) -> Uint8Array {
let img = self.img.fliph();
self.image_to_uint8_array(img)
}
}
编译成功打包上传npm仓库之后,在前端项目中使用有一点需要注意,像这种基于wasm的npm包并不能像常规的npm包那样直接import引入,而是需要异步引入,这种写法非常不优雅,如下
/**
* @description 全局注册md5工具
*/
async function waitwasm () {
const { Crypt, Img } = await import('@duguying/wtools')
Vue.prototype.$md5 = (content) => {
let crypt = new Crypt()
let out = crypt.md5(content)
crypt.free()
return out
}
Vue.prototype.$scale_img = (file) => {
return new Promise(function (resolve, reject) {
let reader = new FileReader()
reader.readAsArrayBuffer(file)
reader.onload = function () {
let data = new Uint8Array(this.result)
console.log('data:', data)
let kit = new Img(data, file.type)
console.log(kit)
let w = kit.get_width()
let h = kit.get_width()
console.log('wh:', w, h)
if (w > 2000) {
w = 2000
h = h / w * 2000
} else {
resolve(file)
return
}
let out = kit.scale(w, h)
resolve(new Blob([out.buffer], { type: file.type }))
}
})
}
}
(async () => {
waitwasm()
})()
他本身是一个异步引入,但是需要等它引入完毕之后,才能调用其中的方法,否则就会报错,所以,这里只好同步阻塞,等他引入完毕了。
rust中,可以通过core_affinity
这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。
绑核的函数如下
static CORE_IDS: Lazy<Vec<CoreId>> =
Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样
fn bind_core(id: usize) {
let mut selected_id = CORE_IDS[0];
for core_id in CORE_IDS.clone() {
println!("core {:?}, bind to: {:?}", selected_id, id);
if core_id.id == id {
selected_id = core_id;
break;
}
}
core_affinity::set_for_current(selected_id);
}
在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]
print core_id: CoreId { id: 7 }, bind id: 6
改为,主线程绑核在创建子线程之后,如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
let poly: Poly = Poly::new(); // 此处调用会创建子线程
config::affinity::bind_core_follow_config(0); // 绑核
watcher.watch(&mut |line: String| {
poly.clone().push(line);
LogWatcherAction::None
})
}
发现,成功的选中指定的主线程7号核,子线程6号核,输出如下
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6
其中两次选核过程中,也都能够正常打印出所有核。
那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
thread::Builder::new()
.name("A".into())
.spawn(|| {
config::affinity::bind_core_follow_config(2);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
config::affinity::bind_core_follow_config(0);
sleep(Duration::from_secs(2));
let poly: Poly = Poly::new(); // worker
watcher.watch(&mut |line: String| {
poly.clone().push(line);
LogWatcherAction::None
})
}
结果如下
core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }
core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }
core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }
可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
thread::Builder::new()
.name("A".into())
.spawn(|| {
config::affinity::bind_core_follow_config(2);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
config::affinity::bind_core_follow_config(0);
sleep(Duration::from_secs(2));
// let poly: Poly = Poly::new(); // worker
thread::Builder::new()
.name("B worker".into())
.spawn(|| {
config::affinity::bind_core_follow_config(1);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
watcher.watch(&mut |line: String| {
// poly.clone().push(line);
LogWatcherAction::None
})
}
其中B worker是替换worker的线程,结果如下
core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }
core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }
core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }
结论,core_affinity::get_core_ids
获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。
最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。
下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;
fn main() {
println!("Hello, world!");
send_es("hi".to_string());
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
运行后,发现有一个警告,并且send_es
没有被调用
➜ tes git:(master) ✗ cargo run
Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
--> src/main.rs:6:5
|
6 | send_es("hi".to_string());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
warning: `tes` (bin "tes") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 3.16s
Running `target/debug/tes`
Hello, world!
hi
警告说futures do nothing
,也就是send_es
将会什么也不做。后面,我找到了一个方法block_on
可以执行async方法,于是,变成了这样
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;
fn main() {
println!("Hello, world!");
block_on(send_es("hi".to_string()));
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
但是执行后发现报错如下
➜ tes git:(master) ✗ cargo run
Compiling tes v0.1.0 (/root/code/las/tes)
Finished dev [unoptimized + debuginfo] target(s) in 3.52s
Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
各种查找解决方案之后,也没能解决这个问题,说是tokio
版本依赖的问题,两个不同的组件间接引用了不同版本的tokio
,说是引入tokio = "*"
就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio
上。于是阅读了tokio
的官方文档,了解到运行async函数可以用#[tokio::main]
标注,结合.await
就可以了。于是重新修改后如下
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;
#[tokio::main]
async fn main() {
println!("Hello, world!");
send_es("hi".to_string()).await;
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
问题解决,终于调用成功了。
总结,tokio
是rust中有名的异步调用的包。它定义了async
和await
这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]
标注。
今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。
所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>
容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……
那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。
fn testx() {
let mut data = "hi".to_string();
thread::spawn(move || loop {
println!("{:?}", data);
thread::sleep(Duration::from_secs(10));
});
println!("{:?}", data);
}
可以看到spawn生成的线程内部访问data,会被要求加上move
关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了data
,data
的所有权就变为这个线程的了,只能在这个线程内访问。后续的println
访问data
就会报错。如下
error[E0382]: borrow of moved value: `data`
--> src/main.rs:38:22
|
31 | let mut data = "hi".to_string();
| -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 | thread::spawn(move || loop {
| ------- value moved into closure here
33 | println!("{:?}", data);
| ---- variable moved due to use in closure
...
38 | println!("{:?}", data);
| ^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)
可以看到报错value borrowed here after move
,也就是访问了所有权被移动过的变量。
那么我们如何才能使data
能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>
。代码改成如下
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
println!("th -> {:?}", clone_data.lock().unwrap());
clone_data.lock().unwrap().push_str(", hello");
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
可以看到要访问的数据套上Arc<Mutex<...>>
之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock()
,然后读取到的数据就是raw_data
,而在主线程中访问的数据也是先lock()
加锁,读取到的也是raw_data
。相当于在raw_data
外部加了个套,虽然data
与clone_data
是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_data
,lock()
锁定“解套”后访问的是同一个数据。我们来看下运行结果
th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"
可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。
那么rust是怎么解锁的呢。 我们把代码改成这样
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
println!("th -> {:?}", v);
appendCnt(clone_data.clone());
// 线程结束后,v 被回收,v所在的锁才会释放
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
fn appendCnt(mut data: Arc<Mutex<String>>) {
data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}
执行结果显示打印了第一个th -> "hi"
之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()
即v
解锁发生在v
释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt
调用了clone_data.clone()
衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。
那么如果这里我必需要提前解锁怎么办,用drop
即可。如下
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
println!("th -> {:?}", v);
drop(v); // 提前释放 v,提前解锁
appendCnt(clone_data.clone());
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
fn appendCnt(mut data: Arc<Mutex<String>>) {
data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}
运行,发现现在能正常解锁了。
今天又遇到一个新的问题,那就是在闭包函数中,无法引用闭包函数之外的变量。
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
// println!("{}",filename);
// let mut p = poly::Poly::new();
let mut s = String::from("_");
let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();
log_watcher.watch(|line: String| {
let (_, token) = parse::lex::parser(&line);
// p.push(LineStat {});
print!("{}", s);
process::report::send(token);
});
}
上面代码中,watch
传入参数为一个闭包函数,其中需要引用main
函数中的局部变量s
,但是会报错
error[E0308]: mismatched types
--> src/main.rs:22:23
|
22 | log_watcher.watch(|line: String| {
| _______________________^
23 | | let (_, token) = parse::lex::parser(&line);
24 | | // p.push(LineStat {});
25 | | print!("{}", s);
26 | | process::report::send(token);
27 | | });
| |_____^ expected fn pointer, found closure
|
= note: expected fn pointer `fn(std::string::String)`
found closure `[closure@src/main.rs:22:23: 27:6]`
note: closures can only be coerced to `fn` types if they do not capture any variables
查看logwatcher
源码,watch
该版本(0.1.0)方法定义如下
pub fn watch(&mut self, callback: fn (line: String)) {
....
}
不过我后来发现logwatcher
版本升级到(0.1.1)之后,该方法变为
pub fn watch<F: ?Sized>(&mut self, callback: &mut F)
where
F: FnMut(String) -> LogWatcherAction,
{
....
}
如此可以正常调用,代码如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
let mut poly: Poly = Poly::new();
watcher.watch(&mut |line: String| {
poly.clone().push(line);
LogWatcherAction::None
})
}
由此可见,闭包与函数还是有区别的。但是回调函数定义为Fn/FnMut类型就可以同时接受闭包和函数类型的回调参数。举例如下
let a = String::from("abc");
let mut x = || println!("x {}", a);
fn y() {
println!("y")
}
fn wrap(c: &mut Fn()) {
c()
}
wrap(&mut x); // pass a closure
wrap(&mut y); // pass a function
同样可以参考 stackoverflow 的这个提问。
7年前,我选择了自学golang,事实证明我选择对了。目前业界用golang的人越来越多,也越来越卷。于是我就想选择另一门语言了。用了这些年的golang之后,感觉golang在很多场景下还是很难胜任高新能开发;另外,我特别羡慕能做底层开发的同学,比如内核开发、协议栈开发等;因此,我看上了rust,rust号称无gc,甚至还能开发linux内核,还有一些游戏引擎,数据库引擎也是用rust开发的,一听就特别高端是吧。
于是我作为一个新手,开始尝试上手rust,原本准备依照golang的学习路线,先写个博客或者小的web程序,但是,发现rust上手太难。于是降低了上手的要求,用rust重写了一遍我的pinyin项目,项目是将汉字翻译成拼音,支持多音词,需要接入字词库。折腾了许久,终于完成了;这里简单谈一下作为一个rust初学者的感受。
首先,最大一个问题就是局部变量无法在作用域之外使用,这在golang上会自动扩大作用域,无需考虑这些问题。比如,我在一个函数中创建出来的对象,return返回后,发现无法在外部访问,直接报错,实际上该对象在函数结束时已经被销毁。或者说,所在对象发生move,因为他的类型未实现Copy trait。
比如,这种报错经常发生在这么一个场景,我创建了一个config包,辛苦的在里面实现了加载配置文件到一个结构体,当我完成这些之后,希望在另一个mod中引用这个config结构体,便会经常发生这种情况。
use std::fs::File;
use std::io::prelude::*;
use once_cell::sync::Lazy;
#[derive(Deserialize)]
#[derive(Debug)]
struct LasConfig {
pub access_log: Option<String>,
}
#[derive(Deserialize)]
#[derive(Debug)]
struct Conf {
pub las: Option<LasConfig>
}
static global: Lazy<Conf> = Lazy::new(|| {
let file_path = "config.toml";
let mut file = match File::open(file_path) {
Ok(f) => f,
Err(e) => panic!("no such file {} exception:{}", file_path, e)
};
let mut str_val = String::new();
match file.read_to_string(&mut str_val) {
Ok(s) => s,
Err(e) => panic!("Error Reading file: {}", e)
};
toml::from_str(&str_val).unwrap()
});
pub fn get_config() -> Conf {
*global
}
其中*global
的类型的确是Conf
但是却无法通过参数返回。会有如下的报错信息
error[E0507]: cannot move out of dereference of `once_cell::sync::Lazy<Conf>`
--> src/config/cfg.rs:32:5
|
32 | *global
| ^^^^^^^ move occurs because value has type `Conf`, which does not implement the `Copy` trait
For more information about this error, try `rustc --explain E0507`.
很明显,这里生成的Conf
类型的global
对象,但是无法直接返回。因为返回的时候发生了move
。
神奇的是,这里get_config()
改成如下这样,就不会有报错了
pub fn get_config() -> &'static Conf {
(*global).borrow()
}
我们从E0507这篇文章中可以看到
the
nothing_is_true method
takes the ownership ofself
那么我们这里的报错应该也是*global
在返回的时候,将ownership拿走,既然不能拿走,那就改为借用好了。
前段时间因为r4s软路由故障,当时一筹莫展,所以就顺手买了个r5s准备用来替换r4s,将r4s替下来研究防火墙故障原因。但是r5s还没到货就找到故障原因了;这里简单介绍一下r5s使用体验。
r5s相比于r4s有一个很大的不同之处,那就是r5s支持emcc存储,也就是说机身自带存储,可以不使用tf卡装系统,当然也可以使用tf卡。在刷第三方openwrt系统的过程中,使用tf卡启动带emcc flasher的固件以及尝试usb线刷的过程中发现一个很重要的点,那就是mask按键的按下时长。这里简单总结一下经验:usb线刷,通电源后,mask需要继续按住3秒然后松掉,才能识别线刷;tf卡引导,接通电源后,mask需要继续按住4s然后松掉,才能从tf卡引导。时间一定要掐准,不然无法进入相应的模式。
从外观上可以看到,r5s有3个网口,其中两个2.5G,一个1G网口,type-c口供电,有一个HDMI口,但是目前没有什么用处,接显示器openwrt系统也只是显示一下字符终端,还没尝试其他linux发行版系统,不知道是否能够在其他系统上发挥这个HDMI接口的用处。机器后面还有2个usb3.0接口,靠右侧的usb3.0接口同时也是线刷口;后面还有一个tf口。机器内部还有一个m2接口,可以接nvme硬盘。不过4核的CPU与r4s相比,少2个核心,但是据说能耗也比r4s略小。
我给它刷上是固件是 r5s.cooluc.com 这个网站发布的固件,通过线刷emcc安装的。系统支持常用插件,以及docker,不过它的默认ip段是 10 开头,这个网段在我公司是被用作内网的,如果居家办公家庭网络也是这个网段,会对居家办公造成影响,所以,我会将其换回 192.168 网段。最后一点,发热量,从我的直观体验上来看,它的发热量与r4s差不多,外壳都是略微有些烫手,r4s系统显示温度是48℃,不过比之我最早使用的x86软路由要好多了,我不喜欢外置风扇之类的,也就不准备配备外置风扇了。
今天是二十四节气中的白露。可惜南方看不到苍苍的蒹葭,也看不到变为霜的白露。
蒹葭苍苍,白露为霜
所谓伊人,在水一方
今年是在南方城市生活的第七年了。白露节气已经悄悄的到了,但是这里每天早晨上班,中午吃饭的时候都是烈日暴晒。仿佛所谓的白露跟这座城市找不到半点关系。
蒹葭是一种草,据说是芦苇。蒹葭苍苍是指秋天的芦苇已经变成了枯黄色,白露是北方秋季早晨路边野草上凝结的水滴,白露为霜是指这种水滴已经变为了白色的霜。对于我老家湖北来说,结霜了通常是到了仲秋或者深秋,天气越发的冷了。
儿时,每周一早晨,和母亲一起去村小学。每逢秋天白露或者霜降时节,都能在路边的野草上见到白露或者白霜,空气寒冷,却很清新。或者是周末或寒假期间,在老家旧宅附近的打谷场上,冬季每天早晨还能看到枯黄的稻杆上覆盖着一层薄薄的霜,不离近了,还很难发现。然后,日子一天天的过着,到了冬季,下一场大雪,大雪会覆盖世间万物,世界出奇的安静和干净。大雪封路,但是快要过年,得去镇上置办年货,于是,一家家一户户,徒步去镇上购置年货。一路上不知道会在雪地里摔跤多少次。那场景记忆犹新。