初学rust,给线程绑核
Tag 绑核, 线程, on by view 236

rust中,可以通过core_affinity这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。

绑核的函数如下

static CORE_IDS: Lazy<Vec<CoreId>> =
    Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样

fn bind_core(id: usize) {
    let mut selected_id = CORE_IDS[0];
    for core_id in CORE_IDS.clone() {
        println!("core {:?}, bind to: {:?}", selected_id, id);
        if core_id.id == id {
            selected_id = core_id;
            break;
        }
    }

    core_affinity::set_for_current(selected_id);
}

在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]

print core_id: CoreId { id: 7 }, bind id: 6

改为,主线程绑核在创建子线程之后,如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let poly: Poly = Poly::new(); // 此处调用会创建子线程
    config::affinity::bind_core_follow_config(0); // 绑核

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

发现,成功的选中指定的主线程7号核,子线程6号核,输出如下

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6

其中两次选核过程中,也都能够正常打印出所有核。

那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    let poly: Poly = Poly::new(); // worker

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }

可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    // let poly: Poly = Poly::new(); // worker

    thread::Builder::new()
        .name("B worker".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(1);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    watcher.watch(&mut |line: String| {
        // poly.clone().push(line);
        LogWatcherAction::None
    })
}

其中B worker是替换worker的线程,结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }

结论,core_affinity::get_core_ids获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 256

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 214

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。


初学rust,闭包引用外部变量
Tag rust, 闭包, on by view 234

今天又遇到一个新的问题,那就是在闭包函数中,无法引用闭包函数之外的变量。

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    // println!("{}",filename);
    // let mut p = poly::Poly::new();
    let mut s = String::from("_");
    let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();

    log_watcher.watch(|line: String| {
        let (_, token) = parse::lex::parser(&line);
        // p.push(LineStat {});
        print!("{}", s);
        process::report::send(token);
    });
}

上面代码中,watch传入参数为一个闭包函数,其中需要引用main函数中的局部变量s,但是会报错

error[E0308]: mismatched types
  --> src/main.rs:22:23
   |
22 |       log_watcher.watch(|line: String| {
   |  _______________________^
23 | |         let (_, token) = parse::lex::parser(&line);
24 | |         // p.push(LineStat {});
25 | |         print!("{}", s);
26 | |         process::report::send(token);
27 | |     });
   | |_____^ expected fn pointer, found closure
   |
   = note: expected fn pointer `fn(std::string::String)`
                 found closure `[closure@src/main.rs:22:23: 27:6]`
note: closures can only be coerced to `fn` types if they do not capture any variables

查看logwatcher源码,watch该版本(0.1.0)方法定义如下

pub fn watch(&mut self, callback: fn (line: String)) {
    ....
}

不过我后来发现logwatcher版本升级到(0.1.1)之后,该方法变为

pub fn watch<F: ?Sized>(&mut self, callback: &mut F)
where
    F: FnMut(String) -> LogWatcherAction,
{
    ....
}

如此可以正常调用,代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let mut poly: Poly = Poly::new();

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

由此可见,闭包与函数还是有区别的。但是回调函数定义为Fn/FnMut类型就可以同时接受闭包和函数类型的回调参数。举例如下

let a = String::from("abc");

let mut x = || println!("x {}", a);

fn y() {
    println!("y")
}

fn wrap(c: &mut Fn()) {
    c()
}

wrap(&mut x); // pass a closure
wrap(&mut y); // pass a function

同样可以参考 stackoverflow 的这个提问


初学rust,为什么对象不能被函数返回
Tag rust, on by view 197

7年前,我选择了自学golang,事实证明我选择对了。目前业界用golang的人越来越多,也越来越卷。于是我就想选择另一门语言了。用了这些年的golang之后,感觉golang在很多场景下还是很难胜任高新能开发;另外,我特别羡慕能做底层开发的同学,比如内核开发、协议栈开发等;因此,我看上了rust,rust号称无gc,甚至还能开发linux内核,还有一些游戏引擎,数据库引擎也是用rust开发的,一听就特别高端是吧。

于是我作为一个新手,开始尝试上手rust,原本准备依照golang的学习路线,先写个博客或者小的web程序,但是,发现rust上手太难。于是降低了上手的要求,用rust重写了一遍我的pinyin项目,项目是将汉字翻译成拼音,支持多音词,需要接入字词库。折腾了许久,终于完成了;这里简单谈一下作为一个rust初学者的感受。

xtyiguc3

首先,最大一个问题就是局部变量无法在作用域之外使用,这在golang上会自动扩大作用域,无需考虑这些问题。比如,我在一个函数中创建出来的对象,return返回后,发现无法在外部访问,直接报错,实际上该对象在函数结束时已经被销毁。或者说,所在对象发生move,因为他的类型未实现Copy trait。

79dsu2ub

比如,这种报错经常发生在这么一个场景,我创建了一个config包,辛苦的在里面实现了加载配置文件到一个结构体,当我完成这些之后,希望在另一个mod中引用这个config结构体,便会经常发生这种情况。

use std::fs::File;
use std::io::prelude::*;
use once_cell::sync::Lazy;

#[derive(Deserialize)]
#[derive(Debug)]
struct LasConfig {
    pub access_log: Option<String>,
}

#[derive(Deserialize)]
#[derive(Debug)]
struct Conf {
    pub las: Option<LasConfig>
}

static global: Lazy<Conf> = Lazy::new(|| {
    let file_path = "config.toml";
    let mut file = match File::open(file_path) {
        Ok(f) => f,
        Err(e) => panic!("no such file {} exception:{}", file_path, e)
    };
    let mut str_val = String::new();
    match file.read_to_string(&mut str_val) {
        Ok(s) => s,
        Err(e) => panic!("Error Reading file: {}", e)
    };
    toml::from_str(&str_val).unwrap()
});

pub fn get_config() -> Conf {
    *global
}

其中*global的类型的确是Conf但是却无法通过参数返回。会有如下的报错信息

error[E0507]: cannot move out of dereference of `once_cell::sync::Lazy<Conf>`
  --> src/config/cfg.rs:32:5
   |
32 |     *global
   |     ^^^^^^^ move occurs because value has type `Conf`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.

e463qi5x

很明显,这里生成的Conf类型的global对象,但是无法直接返回。因为返回的时候发生了move

神奇的是,这里get_config()改成如下这样,就不会有报错了

pub fn get_config() -> &'static Conf {
   (*global).borrow()
}

我们从E0507这篇文章中可以看到

the nothing_is_true method takes the ownership of self

那么我们这里的报错应该也是*global在返回的时候,将ownership拿走,既然不能拿走,那就改为借用好了。


软路由r5s使用体验
Tag r5s, 软路由, on by view 449

前段时间因为r4s软路由故障,当时一筹莫展,所以就顺手买了个r5s准备用来替换r4s,将r4s替下来研究防火墙故障原因。但是r5s还没到货就找到故障原因了;这里简单介绍一下r5s使用体验。

4loyvgky

r5s相比于r4s有一个很大的不同之处,那就是r5s支持emcc存储,也就是说机身自带存储,可以不使用tf卡装系统,当然也可以使用tf卡。在刷第三方openwrt系统的过程中,使用tf卡启动带emcc flasher的固件以及尝试usb线刷的过程中发现一个很重要的点,那就是mask按键的按下时长。这里简单总结一下经验:usb线刷,通电源后,mask需要继续按住3秒然后松掉,才能识别线刷;tf卡引导,接通电源后,mask需要继续按住4s然后松掉,才能从tf卡引导。时间一定要掐准,不然无法进入相应的模式。

从外观上可以看到,r5s有3个网口,其中两个2.5G,一个1G网口,type-c口供电,有一个HDMI口,但是目前没有什么用处,接显示器openwrt系统也只是显示一下字符终端,还没尝试其他linux发行版系统,不知道是否能够在其他系统上发挥这个HDMI接口的用处。机器后面还有2个usb3.0接口,靠右侧的usb3.0接口同时也是线刷口;后面还有一个tf口。机器内部还有一个m2接口,可以接nvme硬盘。不过4核的CPU与r4s相比,少2个核心,但是据说能耗也比r4s略小。

我给它刷上是固件是 r5s.cooluc.com 这个网站发布的固件,通过线刷emcc安装的。系统支持常用插件,以及docker,不过它的默认ip段是 10 开头,这个网段在我公司是被用作内网的,如果居家办公家庭网络也是这个网段,会对居家办公造成影响,所以,我会将其换回 192.168 网段。最后一点,发热量,从我的直观体验上来看,它的发热量与r4s差不多,外壳都是略微有些烫手,r4s系统显示温度是48℃,不过比之我最早使用的x86软路由要好多了,我不喜欢外置风扇之类的,也就不准备配备外置风扇了。


蒹葭苍苍,白露为霜
Tag 白露, on by view 64

今天是二十四节气中的白露。可惜南方看不到苍苍的蒹葭,也看不到变为霜的白露。

蒹葭苍苍,白露为霜
所谓伊人,在水一方

今年是在南方城市生活的第七年了。白露节气已经悄悄的到了,但是这里每天早晨上班,中午吃饭的时候都是烈日暴晒。仿佛所谓的白露跟这座城市找不到半点关系。

86zal12g

蒹葭是一种草,据说是芦苇。蒹葭苍苍是指秋天的芦苇已经变成了枯黄色,白露是北方秋季早晨路边野草上凝结的水滴,白露为霜是指这种水滴已经变为了白色的霜。对于我老家湖北来说,结霜了通常是到了仲秋或者深秋,天气越发的冷了。

7qpdvkah

儿时,每周一早晨,和母亲一起去村小学。每逢秋天白露或者霜降时节,都能在路边的野草上见到白露或者白霜,空气寒冷,却很清新。或者是周末或寒假期间,在老家旧宅附近的打谷场上,冬季每天早晨还能看到枯黄的稻杆上覆盖着一层薄薄的霜,不离近了,还很难发现。然后,日子一天天的过着,到了冬季,下一场大雪,大雪会覆盖世间万物,世界出奇的安静和干净。大雪封路,但是快要过年,得去镇上置办年货,于是,一家家一户户,徒步去镇上购置年货。一路上不知道会在雪地里摔跤多少次。那场景记忆犹新。

i1hha7my

tfjv55qx

zdfrl3us


软路由http连接超时问题排查
Tag openwrt, 连接超时, on by view 96

用软路由有一段时间的,最近家人经常跟我吐槽,家里网络差。后来我发现确实存在网络差的问题,有时候刷抖音视频加载不出来,刷微博图片加载不出来。一开始以为是天威宽带运营商的问题,后面发现同一时间直接链接运营商的光猫路由无线网络不会有问题。这么说来,问题是出在软路由上了。 一开始我以为富强网的问题,因为访问国外没问题,访问国内有问题,各种找解决方案,有人说需要设置这条防火墙规则就可以了

iptables -t nat -I POSTROUTING -o eth0 -j MASQUERADE

但是,我测试了没有效果。而且还比较符合别人的描述,重启防火墙就恢复正常了,然后在不确定长度的一段时间后,又会出现这个问题。这里肯定是防火墙的问题了。但是想尽一切办法都没法解决,于是我关闭了富强网。

第二天,我发现访问csdn的资源链接依然会有问题,通常会卡个1分钟以上,才有响应。这很不正常。看样子跟富强网没关系。我先后确认了DNS解析正常,关闭了“Turbo ACC网络加速”,设置调小了lan口MTU,但是都没有结果。头发都要薅光了。

先dig一下域名

➜  ttt dig csdnimg.cn

; <<>> DiG 9.11.5-P4-5.1+deb10u7-Debian <<>> csdnimg.cn
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 63181
;; flags: qr rd ra; QUERY: 1, ANSWER: 9, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 1232
;; QUESTION SECTION:
;csdnimg.cn.                    IN      A

;; ANSWER SECTION:
csdnimg.cn.             57      IN      CNAME   csdnimg.cn.trpcdn.net.
csdnimg.cn.trpcdn.net.  1717    IN      CNAME   uz95.v.trpcdn.net.
uz95.v.trpcdn.net.      95      IN      A       116.77.73.215
uz95.v.trpcdn.net.      95      IN      A       116.77.73.214
uz95.v.trpcdn.net.      95      IN      A       116.77.73.211
uz95.v.trpcdn.net.      95      IN      A       116.77.73.213
uz95.v.trpcdn.net.      95      IN      A       116.77.73.218
uz95.v.trpcdn.net.      95      IN      A       116.77.73.217
uz95.v.trpcdn.net.      95      IN      A       116.77.73.212

;; Query time: 20 msec
;; SERVER: 192.168.1.1#53(192.168.1.1)
;; WHEN: 一 9月 05 13:45:27 CST 2022
;; MSG SIZE  rcvd: 217

解析出来的是cname到网宿CDN,A记录为天威宽带的节点。从openwrt的ptty终端访问正常,说明ip是通的没问题, 但是在路由下我的电脑上访问,也是偶现超时卡顿。

然后想办法稳定重现,写个shell脚本,用curl循环访问问题链接

#!/bin/bash

for ((i=1;i<=100;i++)); do
  curl --no-tcp-nodelay 'https://csdnimg.cn/release/blogv2/dist/pc/img/pay-time-out.png'$i -w "@fmt" -o /dev/null -s;
done

其中fmt耗时输出格式文件

time_namelookup:  %{time_namelookup}\n
       time_connect:  %{time_connect}\n
    time_appconnect:  %{time_appconnect}\n
      time_redirect:  %{time_redirect}\n
   time_pretransfer:  %{time_pretransfer}\n
 time_starttransfer:  %{time_starttransfer}\n
                    ----------\n
         time_total:  %{time_total}\n

调用shell文件,结果中间卡了很久

vjgy55fs

可以看到中间有个异常耗时

time_namelookup:  0.031654
       time_connect:  129.203610
    time_appconnect:  129.222947
      time_redirect:  0.000000
   time_pretransfer:  129.223397
 time_starttransfer:  129.359648
                    ----------
         time_total:  129.359827

可以看到,dns解析耗时0.031654,没问题,问题主要在time_connect,说明是到该ip的链接超时。众所周知,tcp连接需要握手,其中会传输ACK,SYN这些数据包,用于确认握手状态。会不会是这些包存在被路由器防火墙丢弃的情况?打开openwrt“状态->防火墙”,搜索DROP,有这么一条规则被我看到了

75ura4is

查了一下223.252.199.10,是网易云的,这让我突然想起来,openwrt上有个“解锁网易云灰色歌曲”的功能,曾经被我打开了。

91tw7d76

关闭。网络恢复正常了

最终结论:这次网络故障是由“解锁网易云灰色歌曲”这个功能导致的故障,其原理是劫持所有到网易云的流量到腾讯云,从而实现网易云音乐被封禁的音乐从腾讯云下载。中间的规则应该是出了什么问题,影响到正常链路了。 关闭之后就正常了,openwrt上各种功能的确丰富,但是坑也多,很多功能都需要基于防火墙实现流量劫持与转发,一旦出现问题,就可能影响正常上网。


万兆网卡台湾货的一个有趣现象
Tag 万兆, on by view 38

最近入手了威联通的雷电3万兆光纤网卡。淘宝京东官方店一直缺货,好不容易等来了,赶紧分期买了。今天终于到货了。晒图

s2kfhdfz 赶紧用了一下,挺不错的。我NAS也是万兆网卡,在NAS上运行iperf3服务端,然后macbook pro上用这个雷电万兆网卡,iperf3客户端连接测速。基本跑满万兆。

gpo9z4dx

测速结果 9.4Gbit/sec 左右,符合万兆。然后测试了一下NAS上的samba服务读写速度

ue981e7l

测速结果显示,读速度1033MB/s,能达到万兆;但是写速度只有360MB/s,这可能是因为我NAS上用了3块4T日立硬盘,硬盘比较廉价,读写速度也比较差;另外我猜也可能与我使用了RAIDZ有关,毕竟RAIDZ奇偶校验比较损耗性能。

网卡到手第一时间,发现了一个有趣的事情,网卡上看起来有一行字被人刻意的用黑色油性笔划掉了,凑近隐约能看到 made in taiwan 字样。结合国际时事,威联通以及淘宝京东官方店这款网卡最近一直缺货没补上的情况,一瞬间了解是什么情况。

z63r8f6j

最后呢,我想说,胳膊拧不过大腿;台湾省无脑的井底之蛙太多;希望台湾省早日回归中国大陆。这样对大家都好。


大文件下载的一些经验
Tag 下载, cdn, on by view 41

数年之前,我在一家CDN公司上班。在CDN项目主要工作是负责对客户视频数据搬运到云存储,中间也涉及到下载转码上传之类的工作。其中踩过一些坑,也吸取了一些经验。下面简单讲一下我遇到的几个问题。

下载没网速了

首先是下载没网速的情况。在处理大量的下载任务过程中,发现了一些特殊情况,客户通知我的服务回源,我的下载服务从源站下载一些大文件的时候,开始网络一切正常,可是逐渐的网速越来越低,甚至没网速,这回导致下载一直卡主。可能你会说,设置一个下载超时不就好了,但是,由于客户的文件大多是影视原片,体积巨大,有些甚至达到了几十G,如果简单的设置超时,以处理时间判断下载成败,那么这些大文件基本上不可能成功,都会以超时失败。当然,也可以分片下载来解决文件大的问题,但是对于0网速或者只有几B/s的网速,你的分片就显得无力了。其实,这个状况产生的原因大多是因为源站故障导致的,比如负载高之类的。只要断开连接重新下载就有机会恢复正常速度的连接,而且一般源站是有多个的,重新连接到新的节点就好了。所以,这里“断开连接重新发起下载”这个能力就非常重要了。我原本使用的是github.com/nareix/curl这个库进行文件下载,它的优点是创建异步下载任务后,可以监控当前下载的速度。这样,我可以对每个下载任务的网速进行监控,当网速低于一个阈值的时候,就放弃下载并重试。最初的时候只是简单的调用它的Close()方法,关闭了下载。后面却发现,经常发生fd用完的情况,一查才知道Close()方法并不能释放连接。最后只好自己修改了这个库,给它添加了强制关闭连接的能力。于是就完美的解决了这一问题。记得当时有一次,源站故障了,客户反馈说,它们接入的多家CDN厂商都上报,文件下载失败了,只有我们公司文件下载成功了,听到这消息,我嘴角不由得上翘,漏出了得意的笑容。

并行写同文件

遇到另一个印象深刻的问题就是某客户反馈文件错误的问题。客户反馈说他们在我们CDN上的文件播到一半卡死了,查了文件之后,发现文件大小正常,但是文件只有前半截有数据,后半截全是0,空的。当时,老板CTO领导全站在我工位后面,讨论这个问题,我也是第一时间加回了MD5校验,因为之前在我同事的要求下(同事是老员工、他负责任务调度、我负责几个agent,相当于他为主),去掉了上传前MD5校验,以节省任务时间。当时非常紧张,但是一时间也查不出原因,只能先加MD5校验,然后开发专用校验程序,将半年前开始的所有上传文件重新拉下来做MD5校验,查出异常文件重新上传。后来在一个偶然的机会下,发现了故障的原因,居然是调度测下发任务之后,等待N分钟之后agent还没上报成功,于是又重新下发了这个任务,然后刚好是同一个节点的agent收到该任务,这样一来,同一个节点上2个相同的任务在处理,文件路径一致,也就是说,有2个线程在从源站拉文件写到本地磁盘的同一个路径,一先一后,先处理完的线程开始上传了,后处理的还在写文件,后处理的写文件之后就会将该文件后半截变为空(这里与文件打开模式有关),这样前一个线程在上传的时候,文件其实变成半截了,再加上没有MD5校验,所以,故障就此发生了。

所以之后我就给每个任务的本地路径前加了一个uuid目录,这样一来,不管任务是否同一个文件路径,都不会相互干扰。每个任务都有自己的独立路径了。原程序是我接手别人的,虽说这问题不是我引入,但是,平心而论换做当时的我,也是想不到这一点的。不过,吃一堑长一智,现在只要涉及到本地资源存储,我基本上都会给路径前添加uuid前缀。