初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 571

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。