rust中,可以通过core_affinity
这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。
绑核的函数如下
static CORE_IDS: Lazy<Vec<CoreId>> =
Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样
fn bind_core(id: usize) {
let mut selected_id = CORE_IDS[0];
for core_id in CORE_IDS.clone() {
println!("core {:?}, bind to: {:?}", selected_id, id);
if core_id.id == id {
selected_id = core_id;
break;
}
}
core_affinity::set_for_current(selected_id);
}
在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]
print core_id: CoreId { id: 7 }, bind id: 6
改为,主线程绑核在创建子线程之后,如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
let poly: Poly = Poly::new(); // 此处调用会创建子线程
config::affinity::bind_core_follow_config(0); // 绑核
watcher.watch(&mut |line: String| {
poly.clone().push(line);
LogWatcherAction::None
})
}
发现,成功的选中指定的主线程7号核,子线程6号核,输出如下
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]
print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6
其中两次选核过程中,也都能够正常打印出所有核。
那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
thread::Builder::new()
.name("A".into())
.spawn(|| {
config::affinity::bind_core_follow_config(2);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
config::affinity::bind_core_follow_config(0);
sleep(Duration::from_secs(2));
let poly: Poly = Poly::new(); // worker
watcher.watch(&mut |line: String| {
poly.clone().push(line);
LogWatcherAction::None
})
}
结果如下
core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }
core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }
core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }
可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下
fn main() {
let cfg = config::cfg::get_config();
let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
thread::Builder::new()
.name("A".into())
.spawn(|| {
config::affinity::bind_core_follow_config(2);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
config::affinity::bind_core_follow_config(0);
sleep(Duration::from_secs(2));
// let poly: Poly = Poly::new(); // worker
thread::Builder::new()
.name("B worker".into())
.spawn(|| {
config::affinity::bind_core_follow_config(1);
loop {
sleep(Duration::from_secs(1));
}
})
.unwrap();
watcher.watch(&mut |line: String| {
// poly.clone().push(line);
LogWatcherAction::None
})
}
其中B worker是替换worker的线程,结果如下
core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }
core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }
core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }
结论,core_affinity::get_core_ids
获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。