协程解决了网络编程的哪些痛点

网络编程的痛点

reactor是基于异步事件的网络模型,但io操作都是同步的:非阻塞io,io多路复用,拷贝数据时还是需要等待。
同步和异步的区别是结果是否在发起调用处返回。阻塞与非阻塞的区别是io没有就绪的情况下是否立刻返回。
reactor的事件处理是异步的,回调太多会影响业务逻辑的编写,一个业务逻辑在不同的回调中完成,需要保存和传递上下文。需要通过协程捏合回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void on_login(struct bufferevent *bev, struct Arg *args) {
if (args->eles < 2) {
reply_error(bev, "on_login 登陆参数个数不对\n");
return;
}
Arg *params = args->next;
const char *name = params->str;
size_t name_len = params->len;

params = params->next;
const char *password = params->str;
size_t password_len = params->len;

replyContext *rc = (replyContext *)malloc(sizeof(replyContext));
rc->bev = bev;
rc->args = args;

redisAsyncCommand(redis_conn, redis_login_hget, rc, "hgetall roll: %s", name); // redis_login_hget为回调函数,需要传递上下文
}

// redisAsyncCommand实现思路
// 1. 准备socket
// 2. connect
// 3. 准备好协议
// 4. send
// 5. fd、callback加入epoll,使用yield,跑到下方的2.read部分

// callback处理,这是另外一个线程(协程要想办法改为同一个线程)
// 1. epoll_wait
// 2. read fd
// 3. 解析对应的数据,使用resume,接着跑上面的第五步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function CMD.login(fd, name, password)
if not name or not password then
socket.write(fd, "没有设置账号或密码\n")
return
end
if clients[fd] then
socket.write(fd, ("%s 用户已登陆\n"):format(name))
return
end
local ok, err = rds:exists("role:"..name)
if ok == 0 then
rds:hmset("role:"..name,
"name", name,
"password", password) -- 同步处理
local cli = {}
cli.fd = fd
cli.name = name
cli.password = password
clients[fd] = cli
socket.write(fd, ("%s 注册成功\n"):format(name))
return
end
end

协程

参考:libgo/libco/ntyco
轻量级线程/用户态线程,可以在用户层自主调度使用协层,不满足条件让协程休眠,满足条件唤醒协程,比如read到’\n’才唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- yield/resume
local co = coroutine.create(function (arg1) -- 与条件变量不同的是,可以传递参数
-- 创建的协程
local ret1 = arg1 + 1
-- 是否满足条件
local arg2 = coroutine.yield(ret1) -- 2. 子协程让出
local ret2 = arg2 + 2
return ret2
end)

local co1 = coroutine.running()
local arg1 = 1
local ok, ret1, ret2
ok, ret1 = coroutine.resume(co, arg1) -- 1. 主协程 让出 唤醒 co
print(co1, ok, ret1)
ok, ret2 = coroutine.resume(co, ret1) -- 3. 主协程 让出 唤醒 co
print(co1, ok, ret2)

reactor怎么处理连接建立的流程?

  1. connect返回-1,errno为EINPROGRESS,告诉我们连接建立中
  2. 注册写事件
  3. io多路复用检测这个写事件
  4. 写事件触发,这个时候说明连接建立成功,这里是异步的,协程可以fd = socket.connect(ip, port)就行了

reactor怎么处理数据到达问题?

  1. 连接建立成功后
  2. 注册这条连接的读事件
  3. io多路复用会去检测读事件触发(读缓冲区的数据变动)
  4. 读事件触发,这个时候会回调,接着从读缓冲区读数据
  5. 当满足一个完整的数据包,解析数据包,开始业务逻辑

一个完整的数据包可能有多次读事件触发才满足,可能涉及多次回调!通过协程read可以直接读取一个完整的数据包(特殊字符串分割、特定字节长度),协程才开始运行处理业务逻辑,消除异步事件的回调。

协程怎么实现的?
同步:检测io和读写io在同一个流程里面,理解为同一个线程里面。
异步:检测io和读写io不在同一个流程里面,理解为多线程工作队列。
怎么做到同步的编程的方式异步的性能?采用协程,让io等待的时间去做已经准备好的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 读写io放在工作队列中处理
void worker_cb(void *arg)
{
recv(buffer);
parser();
send();
}

while (1) {
int nready = epoll_wait(); // 检测io
for (int i = 0; i < nready; i++) {
#if 1
push_to_workerqueue(worker_cb); // 异步处理
#else
recv(buffer); // 同步处理:逻辑清晰,效率不高
parser();
send();
#endif
}
}

如何实现yield/resume?

  1. longjmp/setjmp
  2. ucontext
  3. 汇编自己实现(ntyco)

协程实现:

  • yield = switch(a, b),resume = switch(b, a),将寄存器里面的值保存到结构体里面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    typedef struct _nty_cpu_ctx {
    void *esp;
    void *ebp;
    void *eip;
    void *edi;
    void *esi;
    void *ebx;
    void *r1;
    void *r2;
    void *r3;
    void *r4;
    void *r5;
    } nty_cpu_ctx;
    int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
    __asm__ (
    " .text \n"
    " .p2align 2,,3 \n"
    ".globl _switch \n"
    "_switch: \n"
    "__switch: \n"
    "movl 8(%esp), %edx # fs->%edx \n"
    "movl %esp, 0(%edx) # save esp \n"
    "movl %ebp, 4(%edx) # save ebp \n"
    "movl (%esp), %eax # save eip \n"
    "movl %eax, 8(%edx) \n"
    "movl %ebx, 12(%edx) # save ebx,esi,edi \n"
    "movl %esi, 16(%edx) \n"
    "movl %edi, 20(%edx) \n"
    "movl 4(%esp), %edx # ts->%edx \n" // 交换
    "movl 20(%edx), %edi # restore ebx,esi,edi \n"
    "movl 16(%edx), %esi \n"
    "movl 12(%edx), %ebx \n"
    "movl 0(%edx), %esp # restore esp \n"
    "movl 4(%edx), %ebp # restore ebp \n"
    "movl 8(%edx), %eax # restore eip \n"
    "movl %eax, (%esp) \n"
    "ret \n"
    );
    void nty_coroutine_yield(nty_coroutine *co) {
    co->ops = 0;
    _switch(&co->sched->ctx, &co->ctx); // 切换到调度器
    }
    int nty_coroutine_resume(nty_coroutine *co) { // 首先是从调度器运行的resume
    if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {
    nty_coroutine_init(co); // 从ready中取出新创建的协程,先要初始化
    }
    nty_schedule *sched = nty_coroutine_get_sched();
    sched->curr_thread = co;
    _switch(&co->ctx, &co->sched->ctx); // 实际运行协程
    nty_coroutine_madvise(co);
    sched->curr_thread = NULL;
    return 0;
    }
  • 协程的入口函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
       static void _exec(void *lt) {
    nty_coroutine *co = (nty_coroutine*)lt;
    co->func(co->arg); // 运行协程指定的函数
    co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH)); // 运行完成
    nty_coroutine_yield(co);
    }
    static void nty_coroutine_init(nty_coroutine *co)
    {
    void **stack = (void **)(co->stack + co->stack_size);
    stack[-3] = NULL;
    stack[-2] = (void *)co; // 参数压栈

    co->ctx.esp = (void *)stack - (4 * sizeof(void *)); // esp指向stack
    co->ctx.ebp = (void *)stack - (3 * sizeof(void *));
    co->ctx.eip = (void *)_exec; // eip指向func
    co->status = BIT(NTY_COROUTINE_STATUS_READY);
    }
    int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

    nty_schedule *sched = nty_coroutine_get_sched();

    if (sched == NULL) {
    nty_schedule_create(0);

    sched = nty_coroutine_get_sched();
    if (sched == NULL) {
    printf("Failed to create scheduler\n");
    return -1;
    }
    }

    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
    printf("Failed to allocate memory for new coroutine\n");
    return -2;
    }

    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
    printf("Failed to allocate stack for new coroutine\n");
    free(co);
    return -3;
    }
    co->stack_size = sched->stack_size;
    co->sched = sched;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); // 协程初始状态
    co->id = sched->spawned_coroutines ++;
    co->func = func; // 指定协程需要运行的函数
    co->fd = -1;
    co->events = 0;
    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;

    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); // 放入ready中,此时还没有跑协程
    return 0;
    }
    // 主流程
    nty_coroutine *co = NULL;
    nty_coroutine_create(&co, func, NULL); // 创建协程,func里面可以用同步思维来写
    nty_schedule_run(); // 调度器运行
  • 协程定义需要哪些:
    context, stack, size, func, arg, [wait, sleep, ready]rb_tree, status

  • 调度器如何实现?
    yield之前先加入到调度器的epoll里面,yield后,由调度器去检测事件的到来再resume,yield是让给调度器,调度器不处理业务代码。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
      // 调度器,取出准备就绪的协程进行resume
    while (1) {
    // 1. expired --> sleep rbtree
    while ((expired = nty_schedule_expired(sched)) != NULL) {
    nty_coroutine_resume(expired); // 到期了的协程取出运行
    }
    // 2. ready queue // 主要是NTY_COROUTINE_STATUS_NEW的协程
    nty_coroutine *co = TAILQ_FIRST(&sched->ready); // 从ready中取出协程运行
    nty_coroutine_resume(co);
    // 3. wait rbtree,主要都集中在这里
    nty_schedule_epoll(sched); // 阻塞等待就绪
    nty_coroutine *co = nty_schedule_search_wait(fd); // 根据就绪的fd取出对应的协程
    nty_coroutine_resume(co); // 恢复协程运行
    }
    // 协程
    struct epoll_event ev;
    ev.events = nty_pollevent_2epoll(fds[i].events);
    ev.data.fd = fds[i].fd;
    epoll_ctl(sched->poller_fd, EPOLL_CTL_ADD, fds[i].fd, &ev); // 加入epoll

    co->events = fds[i].events;
    nty_schedule_sched_wait(co, fds[i].fd, fds[i].events, timeout); // 放入wait队列

    nty_coroutine_yield(co); // 让出

    struct epoll_event ev;
    ev.events = nty_pollevent_2epoll(fds[i].events);
    ev.data.fd = fds[i].fd;
    epoll_ctl(sched->poller_fd, EPOLL_CTL_DEL, fds[i].fd, &ev); // 准备就绪了,再移出epoll

    nty_schedule_desched_wait(fds[i].fd); // 移出wait队列
  • 协程的接口封装

    • nty_coroutine_create
    • nty_schedule_run
    • read、write、accept、connect,如read可以做成fd先加入epoll,yield,再read
    • hook技术实现:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      #define _GNU_SOURCE
      #include <dlfcn.h>

      typedef ssize_t(*send_t)(int sockfd, const void *buf, size_t len, int flags);
      extern send_t send_f;
      ssize_t send(int fd, const void *buf, size_t len, int flags) {
      printf("my send\n");
      return send_f(fd, buf, len, flags);
      }
      static int init_hook() {
      send_f = (send_t)dlsym(RTLD_NEXT, "send");
      }

      int main() {
      init_hook();
      send(...);
      return 0;
      }
  • 协程的多核

    • cpu亲和性
    • 多线程里面需要对调度器加锁
  • 无栈的协程意味着说共享栈,栈的管理麻烦,不推荐使用,栈可以设到1K-4K甚至1M-10M,参考NTY_CO_MAX_STACKSIZE
nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!