为go程序的协程绑核
Tag golang, 绑核, on by view 1079

最近在公司的日志处理程序上做性能优化,用到了绑核的情况。背景是这样的,nginx进行http转发,产生日志,然后我们的程序读取日志,用lexer分词器对日志分隔字段,并且对字段进行统计聚合上报,生成监控。日志处理程序最开始是在单个goroutine里进行读取并且解析操作了,但是在核数比较多的大机器上,发现日志生成太快,解析程序处理不过来,在日志rotate的过程中会发生丢失日志的情况。于是针对这个情况进行了优化。

用pprof发现,性能消耗最大的部分是lexer,lexer其实就是个分词器,编译器中常用的技术,逐字符读取每行日志,然后基于状态机状态标记对日志的字段进行分割,中间涉及到的状态也不算太多,主要是双引号(“”)作为定界符提取字符串字段,方括号([])作为定界符提取字符串字段,空字符(空格、\t)和竖线符(|)作为分隔符分隔字段,转义符()对字符串中的字符串定界符(”[])进行转义,总体来说,状态不算复杂,其中也针对lexer优化过尽量减少变量分配和杜绝变量逃逸,lexer实在是已经无可优化了。

于是只好从其他方面下手,首先就是cpu切换的性能损耗。众所周知golang中没有线程的,golang中只有协程(goroutine),而防止cpu切换的性能损耗只有绑核这个方法,具体就是讲指定的核绑定到某个线程上,这样这个线程就会只在这个指定的核上运行,不会被系统切换到其他核上,这样也就不会产生切换的损耗了。但是golang程序中只有goroutine,不能直接操作线程。其实我们是有办法对goroutine进行绑核的。

首先,使用go里面的runtime.LockOSThread()将当前goroutine绑定到它所在的M线程,这样,这个goroutine就不会在M线程之间切换了;然后,我们可以使用cgo,调用pthread_self获取当前协程所在M线程的线程ID,并调用CPU_SET对这个线程ID设置cpuid绑定。具体如下

package affinity

/*
#define _GNU_SOURCE
#include <sched.h>
#include <pthread.h>

int lock_thread(int cpuid) {
  pthread_t tid;
  cpu_set_t cpuset;

  tid = pthread_self();
  CPU_ZERO(&cpuset);
  CPU_SET(cpuid, &cpuset);
  return  pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset);
}

pthread_t current_thread_id() {
  pthread_t tid;

  tid = pthread_self();

  return tid;
}
*/
import "C"

import (
	"fmt"
	"runtime"
)

// SetAffinity 设置CPU绑定
func SetAffinity(cpuID int) (uint64, error) {
	runtime.LockOSThread()
	ret := C.lock_thread(C.int(cpuID))
	tid := uint64(C.ulong(C.current_thread_id()))
	if ret > 0 {
		return 0, fmt.Errorf("set cpu core affinity failed with return code %d", ret)
	}
	return tid, nil
}

这样一来,我们只需要在goroutine中调用SetAffinity就可以将指定的cpuid和当前goroutine进行绑定。这样就实现了goroutine的绑定。

我将日志处理程序改为在主协程中读取文件并且通过channel分发日志行,然后在2个goroutine执行最占cpu的lexer及后续处理,并且在这两个goroutine中绑定cpuid为1,2。

qdx9l7z5

图中可以看到,两个处理日志的goroutine绑定了1,2两个cpu,并且不会切为其他cpu,这两个cpu都在处理日志,所以cpu占用都比较高,相当于把原来一个核处理的任务分担到2个核上了。


初学rust,给线程绑核
Tag 绑核, 线程, on by view 667

rust中,可以通过core_affinity这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。

绑核的函数如下

static CORE_IDS: Lazy<Vec<CoreId>> =
    Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样

fn bind_core(id: usize) {
    let mut selected_id = CORE_IDS[0];
    for core_id in CORE_IDS.clone() {
        println!("core {:?}, bind to: {:?}", selected_id, id);
        if core_id.id == id {
            selected_id = core_id;
            break;
        }
    }

    core_affinity::set_for_current(selected_id);
}

在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]

print core_id: CoreId { id: 7 }, bind id: 6

改为,主线程绑核在创建子线程之后,如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let poly: Poly = Poly::new(); // 此处调用会创建子线程
    config::affinity::bind_core_follow_config(0); // 绑核

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

发现,成功的选中指定的主线程7号核,子线程6号核,输出如下

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6

其中两次选核过程中,也都能够正常打印出所有核。

那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    let poly: Poly = Poly::new(); // worker

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }

可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    // let poly: Poly = Poly::new(); // worker

    thread::Builder::new()
        .name("B worker".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(1);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    watcher.watch(&mut |line: String| {
        // poly.clone().push(line);
        LogWatcherAction::None
    })
}

其中B worker是替换worker的线程,结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }

结论,core_affinity::get_core_ids获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。