初学rust,tokio无法在spawn中使用MutexGuard
Tag rust, tokio, MutexGuard, spawn, on by view 115

最近再次在rust中尝试用tokio::spawn实现类似go语言中goroutine的用法,但是报错了。

#[tokio::main]
async fn main() {
    let cfg = cfg::get_config();
    let client_id = cfg::get_client_id();
    let ws_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();

    let mut wsc = ws::WebSocketClient::new(ws_addr.clone(), client_id);
    tokio::spawn(wsc.start());

    let monitor_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
    let mut monitor_client = monitor::Monitor::new(monitor_addr.to_string());
    monitor_client.start().await;
}

报错位置在这一行 tokio::spawn(wsc.start());,报错内容如下

error: future cannot be sent between threads safely
   --> src/main.rs:19:18
    |
19  |     tokio::spawn(wsc.start());
    |                  ^^^^^^^^^^^ future returned by `start` is not `Send`
    |
    = help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<Message>`
note: future is not `Send` as this value is used across an await
   --> src/ws.rs:117:47
    |
115 |                 if let Ok(msg) = rx.recv() {
    |                                  -- has type `&std::sync::mpsc::Receiver<Message>` which is not `Send`
116 |                     println!("Socket Send    : {:?}", msg);
117 |                     let rst = writer.send(msg).await;
    |                                               ^^^^^^ await occurs here, with `rx` maybe used later
...
126 |             }
    |             - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow

可以看到原因是,我在WebSocketClient中用到了MutexGuard,而MutexGuard不支持被tokio::spawn调用。解释参考这篇文档。实际上我是在另一个结构体Pty中使用了Arc<Mutex<Receiver<Message>>>类型,然后WebSocketClient中又实用了Pty结构体,所以在调用tokio::spawn时报错。

如何解决?别用标准库的Mutex了,用tokio的。而且,我同样发现了Sender,Receiver都在报类似的错误the trait std::marker::Send is not implemented for ...

use std::sync::mpsc::{channel, Receiver, Sender}

// 改为
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;

channel,Receiver,Sender全部改为tokio版本的,就可以兼容tokio了。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 554

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。