初学rust,tokio::spawn中loop-sleep无法使用ctrl+c终止进程的原因
Tag tokio, ctrl-c, 终止进程, on by view 8

在tokio::spawn中起了一个新协程,然后在代码中执行sleep死循环,发现程序运行时,主线程中无法通过ctrl+c终止进程。代码如下

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 解析命令行参数
    let args = Arguments::parse();

    if args.version {
        print_ver();
        return Ok(());
    }

    // 初始化摄像头
    spawn(async move {
        info!("camera capture started");
        camera_capture().await;
    });

    let c = config::get_config();
    let listen = c.tlps.as_ref().unwrap().listen.as_ref().unwrap();
    println!("listen http://{}", listen);
    HttpServer::new(|| {
        App::new().service(camera_settings).service(
            SwaggerUi::new("/swagger-ui/{_:.*}").url("/api-docs/openapi.json", ApiDoc::openapi()),
        )
    })
    .bind(listen)?
    .run()
    .await
}

async fn camera_capture() {
    let tlps_cfg = config::get_config().tlps.as_ref().unwrap();

    loop {
        std::thread::sleep(std::time::Duration::from_secs(1));
        print_ver();
    }
}

现象,在终端中按下Ctrl+C无法退出进程,如下

hg6ndsox

最后发现原因是tokio拉起的线程(协程)中不能使用std::threadsleep方法进行休眠,而是应该使用tokiosleep方法休眠。我这里是actix-web,虽然它是基于tokio的,但是,他也有对应的sleep方法,应该改为如下方式

loop {
    actix_web::rt::time::sleep(std::time::Duration::from_secs(1)).await;
    print_ver();
}

再使用ctrl-c,能够正常终止进程了。


初学rust,tokio无法在spawn中使用MutexGuard
Tag rust, tokio, MutexGuard, spawn, on by view 146

最近再次在rust中尝试用tokio::spawn实现类似go语言中goroutine的用法,但是报错了。

#[tokio::main]
async fn main() {
    let cfg = cfg::get_config();
    let client_id = cfg::get_client_id();
    let ws_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();

    let mut wsc = ws::WebSocketClient::new(ws_addr.clone(), client_id);
    tokio::spawn(wsc.start());

    let monitor_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
    let mut monitor_client = monitor::Monitor::new(monitor_addr.to_string());
    monitor_client.start().await;
}

报错位置在这一行 tokio::spawn(wsc.start());,报错内容如下

error: future cannot be sent between threads safely
   --> src/main.rs:19:18
    |
19  |     tokio::spawn(wsc.start());
    |                  ^^^^^^^^^^^ future returned by `start` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<Message>`
note: future is not `Send` as this value is used across an await
   --> src/ws.rs:117:47
    |
115 |                 if let Ok(msg) = rx.recv() {
    |                                  -- has type `&std::sync::mpsc::Receiver<Message>` which is not `Send`
116 |                     println!("Socket Send    : {:?}", msg);
117 |                     let rst = writer.send(msg).await;
    |                                               ^^^^^^ await occurs here, with `rx` maybe used later
...
126 |             }
    |             - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow

可以看到原因是,我在WebSocketClient中用到了MutexGuard,而MutexGuard不支持被tokio::spawn调用。解释参考这篇文档。实际上我是在另一个结构体Pty中使用了Arc<Mutex<Receiver<Message>>>类型,然后WebSocketClient中又实用了Pty结构体,所以在调用tokio::spawn时报错。

如何解决?别用标准库的Mutex了,用tokio的。而且,我同样发现了Sender,Receiver都在报类似的错误the trait std::marker::Send is not implemented for ...

use std::sync::mpsc::{channel, Receiver, Sender}

// 改为
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;

channel,Receiver,Sender全部改为tokio版本的,就可以兼容tokio了。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 603

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。