初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 279

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。


golang程序并发读写全局变量,导致空指针异常
Tag golang, 数据竞争, 线程安全, on by view 159

最近将一个golang程序由原本的单线程改为双线程处理日志解析,在生产环境节点上运行发现出现了一个空指针异常,异常信息如下:

[E] 2023/02/27 11:53:25 panic.go:838: parse error:runtime error: invalid memory address or nil pointer dereference,line:Feb 27 11:53:25 xxxxx.site nginx: [xxxxxxxxxxxxxxxxxxx] [27/Feb/2023:11:53:25 +0800] [https] [120.232.31.196:443] [xxxxx.map.xx.com] [39.144.41.41:37647] [200] [30.171.153.132:10000] [200] [3339597] [POST /tr?mllc HTTP/1.1] [621] [152] [xxxxxxx.map.xx.com] [Dalvik/2.1.0 (Linux; U; Android 10; HMA-AL00 Build/HUAWEIHMA-AL00)] [-] [0.008] [0.008] [0.004] [0.008] [46795] [311532943564] [1][1:46:0:0:0:0:0] [ECDHE-RSA-AES128-GCM-SHA256] [TLSv1.2] [r][-] [-] [n] [2358837] [-1] [1677470005.398|5|51|126|-1|-1|126|126|130|-1|-1|130|134|134|134|134#200|200|8|152|120.232.31.196|30.171.153.132:10000|0|0] [POST] [/tr] [mllc] [HTTP/1.1] [39.144.41.41] [-] [-] [-] [-] [-] [-] [-] [-] [-] [-] [169.254.213.29:50937] [0]

修改程序,打印出异常栈 x7s4uzea

发现异常发生在代码133行,代码如下 0zhhoxbp

很明显这里不太可能出现空指针,除非运行到这一行的时候sl对象被置为nil,但是我很确定这里不存在其他线程共享sl的情况,也就是不可能被其他线程置为nil,何况,我这里没有任何操作将sl置为nil,百思不得其解。最后发现key1key2这两个变量是全局变量,全局变量在多线程环境下会存在数据竞争问题。原本定义如下

var (
	uriSep *regexp.Regexp = nil

	key1 = ""
	key2 = ""
)

可以看到key1,key2被定义为全局变量,忘记修改了。修改之后,神奇的发现,空指针异常已经不再存在了。

我在发现这个问题之前,在本地开发服务器上尝试重现,但是一直未能重现出来,估计是我本地qps不够高,所以难以复现,生产环境qps是3w到4w左右,这个空指针异常呈现无规律的隔几秒钟出现一次。

golang中,双协程(绑定在双M和双cpu上)中同时读写一个共享变量导致空指针异常,这种情况我还是第一次遇见,以前遇到这种双协程读写一个共享变量的情况都是数据错乱,并没有空指针。据说这种线程安全、数据竞争导致的空指针异常在C++中也是常见的情况。所以,我在想,这里会不会是因为我将2个协程绑核了,所以在双线程绑双核的情况下更容易复现呢。


初学rust,如何在线程中调用成员方法
Tag rust, 线程, 方法, on by view 118

如何在线程中调用成员方法?

普通调用习惯写法

fn watch_receiver(mut self, rx: Receiver<String>) {
    thread::spawn(move || {
        for line in rx.iter() {
            self.push(line);
        }
    });
}

会报错

p.watch_receiver(rx);
    |           ------------------ `p` moved due to this method call
70  |         p.watch_poly();
    |         ^^^^^^^^^^^^^^ value borrowed here after move

这里需要把形参self改为指针&self,然后在方法体中克隆这个指针,就可以在方法中的线程里直接通过这个指针的克隆成员方法。

改为

fn watch_receiver(&self, rx: Receiver<String>) {
    let mut me = self.clone();
    thread::spawn(move || {
        for line in rx.iter() {
            me.push(line);
        }
    });
}

即可通过。

但是要注意,这里的clone,真的是克隆。所以clone前后的变量,即selfme是2个不同的变量,里面的成员也是在不同的内存空间上,修改self中的成员属性,me中对应的成员属性并不会跟着变。所以,如果里面的成员属性需要跟随变化,必须把成员属性定义为指针,这样修改指正所指的值,selfme中成员属性所指的值是相同的。


初学rust,给线程绑核
Tag 绑核, 线程, on by view 663

rust中,可以通过core_affinity这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。

绑核的函数如下

static CORE_IDS: Lazy<Vec<CoreId>> =
    Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样

fn bind_core(id: usize) {
    let mut selected_id = CORE_IDS[0];
    for core_id in CORE_IDS.clone() {
        println!("core {:?}, bind to: {:?}", selected_id, id);
        if core_id.id == id {
            selected_id = core_id;
            break;
        }
    }

    core_affinity::set_for_current(selected_id);
}

在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]

print core_id: CoreId { id: 7 }, bind id: 6

改为,主线程绑核在创建子线程之后,如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let poly: Poly = Poly::new(); // 此处调用会创建子线程
    config::affinity::bind_core_follow_config(0); // 绑核

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

发现,成功的选中指定的主线程7号核,子线程6号核,输出如下

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6

其中两次选核过程中,也都能够正常打印出所有核。

那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    let poly: Poly = Poly::new(); // worker

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }

可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    // let poly: Poly = Poly::new(); // worker

    thread::Builder::new()
        .name("B worker".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(1);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    watcher.watch(&mut |line: String| {
        // poly.clone().push(line);
        LogWatcherAction::None
    })
}

其中B worker是替换worker的线程,结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }

结论,core_affinity::get_core_ids获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 281

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。


Java线程编程
Tag java, 线程, on by view 3860

最近学习了一下网络编程,因此就无可避免的学习了多线程编程,先后经过在C#、C以及Java环境下编程,有所斩获。以下是Java环境下线程编程的一些总结:

  1. 创建线程

  2. 实例化Thread类

    Receive rcv;
    Thread th1=new Thread(rcv);

    其中Receive rcv要通过override抽象类Runnable来实现

  3. 重载抽象类Runnable

  4. 主要是重载其中的run方法,因为线程启动的目标函数就是重载后的run()方法,示例如下:

     public class Receive implements Runnable{
      @Override
      public void run() {
        System.out.print("sdjfhsdjf");
      }
      public static Receive init(){
        return new Receive();
      }
  5. 启动线程

  6. th1.start();

    线程启动后将会执行rcv.run();