最近再次在rust中尝试用tokio::spawn实现类似go语言中goroutine的用法,但是报错了。
#[tokio::main]
async fn main() {
let cfg = cfg::get_config();
let client_id = cfg::get_client_id();
let ws_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
let mut wsc = ws::WebSocketClient::new(ws_addr.clone(), client_id);
tokio::spawn(wsc.start());
let monitor_addr = cfg.agent.as_ref().unwrap().remote_ws_addr.as_ref().unwrap();
let mut monitor_client = monitor::Monitor::new(monitor_addr.to_string());
monitor_client.start().await;
}
报错位置在这一行 tokio::spawn(wsc.start());
,报错内容如下
error: future cannot be sent between threads safely
--> src/main.rs:19:18
|
19 | tokio::spawn(wsc.start());
| ^^^^^^^^^^^ future returned by `start` is not `Send`
|
= help: the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<Message>`
note: future is not `Send` as this value is used across an await
--> src/ws.rs:117:47
|
115 | if let Ok(msg) = rx.recv() {
| -- has type `&std::sync::mpsc::Receiver<Message>` which is not `Send`
116 | println!("Socket Send : {:?}", msg);
117 | let rst = writer.send(msg).await;
| ^^^^^^ await occurs here, with `rx` maybe used later
...
126 | }
| - `rx` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
可以看到原因是,我在WebSocketClient
中用到了MutexGuard
,而MutexGuard
不支持被tokio::spawn
调用。解释参考这篇文档。实际上我是在另一个结构体Pty
中使用了Arc<Mutex<Receiver<Message>>>
类型,然后WebSocketClient
中又实用了Pty
结构体,所以在调用tokio::spawn
时报错。
如何解决?别用标准库的Mutex了,用tokio的。而且,我同样发现了Sender
,Receiver
都在报类似的错误the trait std::marker::Send is not implemented for ...
。
use std::sync::mpsc::{channel, Receiver, Sender}
// 改为
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
channel
,Receiver
,Sender
全部改为tokio版本的,就可以兼容tokio了。