网络编程的痛点
reactor是基于异步事件的网络模型,但io操作都是同步的:非阻塞io,io多路复用,拷贝数据时还是需要等待。
同步和异步的区别是结果是否在发起调用处返回。阻塞与非阻塞的区别是io没有就绪的情况下是否立刻返回。
reactor的事件处理是异步的,回调太多会影响业务逻辑的编写,一个业务逻辑在不同的回调中完成,需要保存和传递上下文。需要通过协程捏合回调。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31void on_login(struct bufferevent *bev, struct Arg *args) {
if (args->eles < 2) {
reply_error(bev, "on_login 登陆参数个数不对\n");
return;
}
Arg *params = args->next;
const char *name = params->str;
size_t name_len = params->len;
params = params->next;
const char *password = params->str;
size_t password_len = params->len;
replyContext *rc = (replyContext *)malloc(sizeof(replyContext));
rc->bev = bev;
rc->args = args;
redisAsyncCommand(redis_conn, redis_login_hget, rc, "hgetall roll: %s", name); // redis_login_hget为回调函数,需要传递上下文
}
// redisAsyncCommand实现思路
// 1. 准备socket
// 2. connect
// 3. 准备好协议
// 4. send
// 5. fd、callback加入epoll,使用yield,跑到下方的2.read部分
// callback处理,这是另外一个线程(协程要想办法改为同一个线程)
// 1. epoll_wait
// 2. read fd
// 3. 解析对应的数据,使用resume,接着跑上面的第五步
1 | function CMD.login(fd, name, password) |
协程
参考:libgo/libco/ntyco
轻量级线程/用户态线程,可以在用户层自主调度使用协层,不满足条件让协程休眠,满足条件唤醒协程
,比如read到’\n’才唤醒。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17-- yield/resume
local co = coroutine.create(function (arg1) -- 与条件变量不同的是,可以传递参数
-- 创建的协程
local ret1 = arg1 + 1
-- 是否满足条件
local arg2 = coroutine.yield(ret1) -- 2. 子协程让出
local ret2 = arg2 + 2
return ret2
end)
local co1 = coroutine.running()
local arg1 = 1
local ok, ret1, ret2
ok, ret1 = coroutine.resume(co, arg1) -- 1. 主协程 让出 唤醒 co
print(co1, ok, ret1)
ok, ret2 = coroutine.resume(co, ret1) -- 3. 主协程 让出 唤醒 co
print(co1, ok, ret2)
reactor怎么处理连接建立的流程?
- connect返回-1,errno为EINPROGRESS,告诉我们连接建立中
- 注册写事件
- io多路复用检测这个写事件
- 写事件触发,这个时候说明连接建立成功,这里是异步的,协程可以fd = socket.connect(ip, port)就行了
reactor怎么处理数据到达问题?
- 连接建立成功后
- 注册这条连接的读事件
- io多路复用会去检测读事件触发(读缓冲区的数据变动)
- 读事件触发,这个时候会回调,接着从读缓冲区读数据
- 当满足一个完整的数据包,解析数据包,开始业务逻辑
一个完整的数据包可能有多次读事件触发才满足,可能涉及多次回调!通过协程read可以直接读取一个完整的数据包(特殊字符串分割、特定字节长度),协程才开始运行处理业务逻辑,消除异步事件的回调。
协程怎么实现的?
同步:检测io和读写io在同一个流程里面,理解为同一个线程里面。
异步:检测io和读写io不在同一个流程里面,理解为多线程工作队列。
怎么做到同步的编程的方式异步的性能?采用协程,让io等待的时间去做已经准备好的事件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 读写io放在工作队列中处理
void worker_cb(void *arg)
{
recv(buffer);
parser();
send();
}
while (1) {
int nready = epoll_wait(); // 检测io
for (int i = 0; i < nready; i++) {
push_to_workerqueue(worker_cb); // 异步处理
recv(buffer); // 同步处理:逻辑清晰,效率不高
parser();
send();
}
}
如何实现yield/resume?
- longjmp/setjmp
- ucontext
- 汇编自己实现(ntyco)
协程实现:
yield = switch(a, b),resume = switch(b, a),将寄存器里面的值保存到结构体里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53typedef struct _nty_cpu_ctx {
void *esp;
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
__asm__ (
" .text \n"
" .p2align 2,,3 \n"
".globl _switch \n"
"_switch: \n"
"__switch: \n"
"movl 8(%esp), %edx # fs->%edx \n"
"movl %esp, 0(%edx) # save esp \n"
"movl %ebp, 4(%edx) # save ebp \n"
"movl (%esp), %eax # save eip \n"
"movl %eax, 8(%edx) \n"
"movl %ebx, 12(%edx) # save ebx,esi,edi \n"
"movl %esi, 16(%edx) \n"
"movl %edi, 20(%edx) \n"
"movl 4(%esp), %edx # ts->%edx \n" // 交换
"movl 20(%edx), %edi # restore ebx,esi,edi \n"
"movl 16(%edx), %esi \n"
"movl 12(%edx), %ebx \n"
"movl 0(%edx), %esp # restore esp \n"
"movl 4(%edx), %ebp # restore ebp \n"
"movl 8(%edx), %eax # restore eip \n"
"movl %eax, (%esp) \n"
"ret \n"
);
void nty_coroutine_yield(nty_coroutine *co) {
co->ops = 0;
_switch(&co->sched->ctx, &co->ctx); // 切换到调度器
}
int nty_coroutine_resume(nty_coroutine *co) { // 首先是从调度器运行的resume
if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {
nty_coroutine_init(co); // 从ready中取出新创建的协程,先要初始化
}
nty_schedule *sched = nty_coroutine_get_sched();
sched->curr_thread = co;
_switch(&co->ctx, &co->sched->ctx); // 实际运行协程
nty_coroutine_madvise(co);
sched->curr_thread = NULL;
return 0;
}协程的入口函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61static void _exec(void *lt) {
nty_coroutine *co = (nty_coroutine*)lt;
co->func(co->arg); // 运行协程指定的函数
co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH)); // 运行完成
nty_coroutine_yield(co);
}
static void nty_coroutine_init(nty_coroutine *co)
{
void **stack = (void **)(co->stack + co->stack_size);
stack[-3] = NULL;
stack[-2] = (void *)co; // 参数压栈
co->ctx.esp = (void *)stack - (4 * sizeof(void *)); // esp指向stack
co->ctx.ebp = (void *)stack - (3 * sizeof(void *));
co->ctx.eip = (void *)_exec; // eip指向func
co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) {
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL) {
printf("Failed to create scheduler\n");
return -1;
}
}
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL) {
printf("Failed to allocate memory for new coroutine\n");
return -2;
}
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
if (ret) {
printf("Failed to allocate stack for new coroutine\n");
free(co);
return -3;
}
co->stack_size = sched->stack_size;
co->sched = sched;
co->status = BIT(NTY_COROUTINE_STATUS_NEW); // 协程初始状态
co->id = sched->spawned_coroutines ++;
co->func = func; // 指定协程需要运行的函数
co->fd = -1;
co->events = 0;
co->arg = arg;
co->birth = nty_coroutine_usec_now();
*new_co = co;
TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); // 放入ready中,此时还没有跑协程
return 0;
}
// 主流程
nty_coroutine *co = NULL;
nty_coroutine_create(&co, func, NULL); // 创建协程,func里面可以用同步思维来写
nty_schedule_run(); // 调度器运行协程定义需要哪些:
context, stack, size, func, arg, [wait, sleep, ready]rb_tree, status调度器如何实现?
yield之前先加入到调度器的epoll里面,yield后,由调度器去检测事件的到来再resume,yield是让给调度器,调度器不处理业务代码。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// 调度器,取出准备就绪的协程进行resume
while (1) {
// 1. expired --> sleep rbtree
while ((expired = nty_schedule_expired(sched)) != NULL) {
nty_coroutine_resume(expired); // 到期了的协程取出运行
}
// 2. ready queue // 主要是NTY_COROUTINE_STATUS_NEW的协程
nty_coroutine *co = TAILQ_FIRST(&sched->ready); // 从ready中取出协程运行
nty_coroutine_resume(co);
// 3. wait rbtree,主要都集中在这里
nty_schedule_epoll(sched); // 阻塞等待就绪
nty_coroutine *co = nty_schedule_search_wait(fd); // 根据就绪的fd取出对应的协程
nty_coroutine_resume(co); // 恢复协程运行
}
// 协程
struct epoll_event ev;
ev.events = nty_pollevent_2epoll(fds[i].events);
ev.data.fd = fds[i].fd;
epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev); // 加入epoll
co->events = fds[i].events;
nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout); // 放入wait队列
nty_coroutine_yield(co); // 让出
struct epoll_event ev;
ev.events = nty_pollevent_2epoll(fds[i].events);
ev.data.fd = fds[i].fd;
epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev); // 准备就绪了,再移出epoll
nty_schedule_desched_wait(fds[i].fd); // 移出wait队列协程的接口封装
- nty_coroutine_create
- nty_schedule_run
- read、write、accept、connect,如read可以做成fd先加入epoll,yield,再read
- hook技术实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef ssize_t(*send_t)(int sockfd, const void *buf, size_t len, int flags);
extern send_t send_f;
ssize_t send(int fd, const void *buf, size_t len, int flags) {
printf("my send\n");
return send_f(fd, buf, len, flags);
}
static int init_hook() {
send_f = (send_t)dlsym(RTLD_NEXT, "send");
}
int main() {
init_hook();
send(...);
return 0;
}
协程的多核
- cpu亲和性
- 多线程里面需要对调度器加锁
- 无栈的协程意味着说共享栈,栈的管理麻烦,不推荐使用,栈可以设到1K-4K甚至1M-10M,参考NTY_CO_MAX_STACKSIZE