初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 256

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 214

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。


初学rust,闭包引用外部变量
Tag rust, 闭包, on by view 234

今天又遇到一个新的问题,那就是在闭包函数中,无法引用闭包函数之外的变量。

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    // println!("{}",filename);
    // let mut p = poly::Poly::new();
    let mut s = String::from("_");
    let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();

    log_watcher.watch(|line: String| {
        let (_, token) = parse::lex::parser(&line);
        // p.push(LineStat {});
        print!("{}", s);
        process::report::send(token);
    });
}

上面代码中,watch传入参数为一个闭包函数,其中需要引用main函数中的局部变量s,但是会报错

error[E0308]: mismatched types
  --> src/main.rs:22:23
   |
22 |       log_watcher.watch(|line: String| {
   |  _______________________^
23 | |         let (_, token) = parse::lex::parser(&line);
24 | |         // p.push(LineStat {});
25 | |         print!("{}", s);
26 | |         process::report::send(token);
27 | |     });
   | |_____^ expected fn pointer, found closure
   |
   = note: expected fn pointer `fn(std::string::String)`
                 found closure `[closure@src/main.rs:22:23: 27:6]`
note: closures can only be coerced to `fn` types if they do not capture any variables

查看logwatcher源码,watch该版本(0.1.0)方法定义如下

pub fn watch(&mut self, callback: fn (line: String)) {
    ....
}

不过我后来发现logwatcher版本升级到(0.1.1)之后,该方法变为

pub fn watch<F: ?Sized>(&mut self, callback: &mut F)
where
    F: FnMut(String) -> LogWatcherAction,
{
    ....
}

如此可以正常调用,代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let mut poly: Poly = Poly::new();

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

由此可见,闭包与函数还是有区别的。但是回调函数定义为Fn/FnMut类型就可以同时接受闭包和函数类型的回调参数。举例如下

let a = String::from("abc");

let mut x = || println!("x {}", a);

fn y() {
    println!("y")
}

fn wrap(c: &mut Fn()) {
    c()
}

wrap(&mut x); // pass a closure
wrap(&mut y); // pass a function

同样可以参考 stackoverflow 的这个提问


软路由r5s使用体验
Tag r5s, 软路由, on by view 449

前段时间因为r4s软路由故障,当时一筹莫展,所以就顺手买了个r5s准备用来替换r4s,将r4s替下来研究防火墙故障原因。但是r5s还没到货就找到故障原因了;这里简单介绍一下r5s使用体验。

4loyvgky

r5s相比于r4s有一个很大的不同之处,那就是r5s支持emcc存储,也就是说机身自带存储,可以不使用tf卡装系统,当然也可以使用tf卡。在刷第三方openwrt系统的过程中,使用tf卡启动带emcc flasher的固件以及尝试usb线刷的过程中发现一个很重要的点,那就是mask按键的按下时长。这里简单总结一下经验:usb线刷,通电源后,mask需要继续按住3秒然后松掉,才能识别线刷;tf卡引导,接通电源后,mask需要继续按住4s然后松掉,才能从tf卡引导。时间一定要掐准,不然无法进入相应的模式。

从外观上可以看到,r5s有3个网口,其中两个2.5G,一个1G网口,type-c口供电,有一个HDMI口,但是目前没有什么用处,接显示器openwrt系统也只是显示一下字符终端,还没尝试其他linux发行版系统,不知道是否能够在其他系统上发挥这个HDMI接口的用处。机器后面还有2个usb3.0接口,靠右侧的usb3.0接口同时也是线刷口;后面还有一个tf口。机器内部还有一个m2接口,可以接nvme硬盘。不过4核的CPU与r4s相比,少2个核心,但是据说能耗也比r4s略小。

我给它刷上是固件是 r5s.cooluc.com 这个网站发布的固件,通过线刷emcc安装的。系统支持常用插件,以及docker,不过它的默认ip段是 10 开头,这个网段在我公司是被用作内网的,如果居家办公家庭网络也是这个网段,会对居家办公造成影响,所以,我会将其换回 192.168 网段。最后一点,发热量,从我的直观体验上来看,它的发热量与r4s差不多,外壳都是略微有些烫手,r4s系统显示温度是48℃,不过比之我最早使用的x86软路由要好多了,我不喜欢外置风扇之类的,也就不准备配备外置风扇了。


蒹葭苍苍,白露为霜
Tag 白露, on by view 64

今天是二十四节气中的白露。可惜南方看不到苍苍的蒹葭,也看不到变为霜的白露。

蒹葭苍苍,白露为霜
所谓伊人,在水一方

今年是在南方城市生活的第七年了。白露节气已经悄悄的到了,但是这里每天早晨上班,中午吃饭的时候都是烈日暴晒。仿佛所谓的白露跟这座城市找不到半点关系。

86zal12g

蒹葭是一种草,据说是芦苇。蒹葭苍苍是指秋天的芦苇已经变成了枯黄色,白露是北方秋季早晨路边野草上凝结的水滴,白露为霜是指这种水滴已经变为了白色的霜。对于我老家湖北来说,结霜了通常是到了仲秋或者深秋,天气越发的冷了。

7qpdvkah

儿时,每周一早晨,和母亲一起去村小学。每逢秋天白露或者霜降时节,都能在路边的野草上见到白露或者白霜,空气寒冷,却很清新。或者是周末或寒假期间,在老家旧宅附近的打谷场上,冬季每天早晨还能看到枯黄的稻杆上覆盖着一层薄薄的霜,不离近了,还很难发现。然后,日子一天天的过着,到了冬季,下一场大雪,大雪会覆盖世间万物,世界出奇的安静和干净。大雪封路,但是快要过年,得去镇上置办年货,于是,一家家一户户,徒步去镇上购置年货。一路上不知道会在雪地里摔跤多少次。那场景记忆犹新。

i1hha7my

tfjv55qx

zdfrl3us


软路由http连接超时问题排查
Tag openwrt, 连接超时, on by view 96

用软路由有一段时间的,最近家人经常跟我吐槽,家里网络差。后来我发现确实存在网络差的问题,有时候刷抖音视频加载不出来,刷微博图片加载不出来。一开始以为是天威宽带运营商的问题,后面发现同一时间直接链接运营商的光猫路由无线网络不会有问题。这么说来,问题是出在软路由上了。 一开始我以为富强网的问题,因为访问国外没问题,访问国内有问题,各种找解决方案,有人说需要设置这条防火墙规则就可以了

iptables -t nat -I POSTROUTING -o eth0 -j MASQUERADE

但是,我测试了没有效果。而且还比较符合别人的描述,重启防火墙就恢复正常了,然后在不确定长度的一段时间后,又会出现这个问题。这里肯定是防火墙的问题了。但是想尽一切办法都没法解决,于是我关闭了富强网。

第二天,我发现访问csdn的资源链接依然会有问题,通常会卡个1分钟以上,才有响应。这很不正常。看样子跟富强网没关系。我先后确认了DNS解析正常,关闭了“Turbo ACC网络加速”,设置调小了lan口MTU,但是都没有结果。头发都要薅光了。

先dig一下域名

➜  ttt dig csdnimg.cn

; <<>> DiG 9.11.5-P4-5.1+deb10u7-Debian <<>> csdnimg.cn
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 63181
;; flags: qr rd ra; QUERY: 1, ANSWER: 9, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 1232
;; QUESTION SECTION:
;csdnimg.cn.                    IN      A

;; ANSWER SECTION:
csdnimg.cn.             57      IN      CNAME   csdnimg.cn.trpcdn.net.
csdnimg.cn.trpcdn.net.  1717    IN      CNAME   uz95.v.trpcdn.net.
uz95.v.trpcdn.net.      95      IN      A       116.77.73.215
uz95.v.trpcdn.net.      95      IN      A       116.77.73.214
uz95.v.trpcdn.net.      95      IN      A       116.77.73.211
uz95.v.trpcdn.net.      95      IN      A       116.77.73.213
uz95.v.trpcdn.net.      95      IN      A       116.77.73.218
uz95.v.trpcdn.net.      95      IN      A       116.77.73.217
uz95.v.trpcdn.net.      95      IN      A       116.77.73.212

;; Query time: 20 msec
;; SERVER: 192.168.1.1#53(192.168.1.1)
;; WHEN: 一 9月 05 13:45:27 CST 2022
;; MSG SIZE  rcvd: 217

解析出来的是cname到网宿CDN,A记录为天威宽带的节点。从openwrt的ptty终端访问正常,说明ip是通的没问题, 但是在路由下我的电脑上访问,也是偶现超时卡顿。

然后想办法稳定重现,写个shell脚本,用curl循环访问问题链接

#!/bin/bash

for ((i=1;i<=100;i++)); do
  curl --no-tcp-nodelay 'https://csdnimg.cn/release/blogv2/dist/pc/img/pay-time-out.png'$i -w "@fmt" -o /dev/null -s;
done

其中fmt耗时输出格式文件

time_namelookup:  %{time_namelookup}\n
       time_connect:  %{time_connect}\n
    time_appconnect:  %{time_appconnect}\n
      time_redirect:  %{time_redirect}\n
   time_pretransfer:  %{time_pretransfer}\n
 time_starttransfer:  %{time_starttransfer}\n
                    ----------\n
         time_total:  %{time_total}\n

调用shell文件,结果中间卡了很久

vjgy55fs

可以看到中间有个异常耗时

time_namelookup:  0.031654
       time_connect:  129.203610
    time_appconnect:  129.222947
      time_redirect:  0.000000
   time_pretransfer:  129.223397
 time_starttransfer:  129.359648
                    ----------
         time_total:  129.359827

可以看到,dns解析耗时0.031654,没问题,问题主要在time_connect,说明是到该ip的链接超时。众所周知,tcp连接需要握手,其中会传输ACK,SYN这些数据包,用于确认握手状态。会不会是这些包存在被路由器防火墙丢弃的情况?打开openwrt“状态->防火墙”,搜索DROP,有这么一条规则被我看到了

75ura4is

查了一下223.252.199.10,是网易云的,这让我突然想起来,openwrt上有个“解锁网易云灰色歌曲”的功能,曾经被我打开了。

91tw7d76

关闭。网络恢复正常了

最终结论:这次网络故障是由“解锁网易云灰色歌曲”这个功能导致的故障,其原理是劫持所有到网易云的流量到腾讯云,从而实现网易云音乐被封禁的音乐从腾讯云下载。中间的规则应该是出了什么问题,影响到正常链路了。 关闭之后就正常了,openwrt上各种功能的确丰富,但是坑也多,很多功能都需要基于防火墙实现流量劫持与转发,一旦出现问题,就可能影响正常上网。