参考资料
https://datatracker.ietf.org/doc/html/rfc7540
https://www.nghttp2.org/documentation/
h2的优势:
- 性能更强:HTTP/2的多路复用技术,不仅可以减少连接次数,同时使用非阻塞I/O技术可以在一次握手过程中提高传输性能。
- 更强大的头部压缩:HTTP/2使用HPACK技术对头部信息进行压缩,可以有效减少传输的头部信息大小,从而节省带宽消耗。
- 服务端推送:服务器可提前将客户端可能需要的资源进行推送,从而可以减少客户端的重复请求,提升页面访问速度。
- 请求优先级:HTTP/2支持浏览器控制请求和响应的优先级,从而可以减少资源的加载时间。
- 更安全:HTTP/2默认使用HTTPS,可以有效防止中间人攻击,保障用户信息传输的安全性。
nghttp2 demo使用方法
demo是使用的libevent框架进行网络收发的,不需要关注具体的网络细节处理,只需要处理好相对应的回调函数即可。
nghttp2 demo编译
Makefile文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16./configure
make -j5 && make install
# https://www.nghttp2.org/documentation/tutorial-server.html
# https://www.nghttp2.org/documentation/tutorial-client.html
# 目录结构如下
$ tree
.
├── client.c
├── Makefile #见上方附件
├── server.c
├── url_parser.c
└── url_parser.h
# 编译server、client
$ make
1 | #测试方法 |
nghttp2 http2Server实现方案(参考)
http2具体架构
http2具体实现
nghttp2使用框架
nghttp2的api参考https://www.nghttp2.org/documentation/apiref.html, 所有api使用是单线程环境,即同一个session下的流调用api时需要保证是串行的,多线程使用时需要加锁,否则会出现一些不可预料的问题。
官方提供的httpserver例程是一个session放在一个线程里面执行,这样可以避免加锁的问题。
node由于stream会被httpObj接管,而httpObj是在多线程环境下使用的,所以调用nghttp2的api时需要加锁,同一个session下的stream共用sessionLock。
h2的多个stream收发数据其实还是共用一个fd,这个fd统一由session管理,也就是多个stream是同一个tcp连接,只是说其中一个stream的数据如果阻塞了,不影响别的stream数据的收发。
如果同一个session下的stream都是在同一个线程中进行处理,即类似于官方提供的httpserver例程,那么实际数据收发的fd都交由该线程的epoll管理,有收发事件时触发对应的回调函数即可,这种逻辑比较容易处理。
但是如果同一个session下的stream都是在不同的线程中进行处理,但我们实际收发数据只有一个fd,虽然fd可以被多个线程同时使用,所以收发的数据变成了一个临界资源,处理临界资源时需要加sessionLock才行。
而且实际nghttp2_session_mem_send序列化数据时是序列化整个session下的所有stream,不是单单对应那个线程的stream,序列化完后再通过session下的fd进行发送,所以可能当前线程的stream的数据会被其他线程发送完,此时当前线程只需要检测是否发完,或者等待其他获取sessionLock的线程发完释放锁后尝试获取sessionLock再进行发送,相比官方提供的那种单线程reactor模型,这种多线程模型处理起来要复杂一些。
node为了结合之前的网络框架,使用的是这种多线程模型。
node里面使用nghttp2的难点在于数据的发送,因为接收只是一些http请求而已,不会特别大,哪怕一次没有接收完,那先放入缓存,下次再进行接收即可,而发送数据是大文件数据,一次性基本是发不完的,可能sendbuf满需要触发下次的epollout继续重发等等,另外文件是分批次发送的,发完当前批次再发送下一批次,所以没有办法直接调用官方推荐的nghttp2_submit_response进行处理,目前是采用的nghttp2_submit_headers+nghttp2_submit_data的方式。
数据(header和body)都是需要先经过submit,然后通过nghttp2_session_mem_send序列化后发送出去,如果发送过程中sendbuf满了,那下次重发时不需要进行submit了,接着之前发送的序列化后的数据继续发即可。
H2建连
设置ssl握手回调函数,是否要走h2协议握手阶段进行的,所以这里可以加是否开启h2的开关。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42static unsigned char next_proto_list[256];
static size_t next_proto_list_len;
int NextProtoCb(SSL *ssl, const unsigned char **data,
unsigned int *len, void *arg)
{
(void)ssl;
(void)arg;
*data = next_proto_list;
*len = (unsigned int)next_proto_list_len;
return SSL_TLSEXT_ERR_OK;
}
int AlpnSelectProtoCb(SSL *ssl, const unsigned char **out,
unsigned char *outlen, const unsigned char *in,
unsigned int inlen, void *arg)
{
int rv;
(void)ssl;
(void)arg;
rv = nghttp2_select_next_protocol((unsigned char **)out, outlen, in, inlen);
if (rv != 1)
{
return SSL_TLSEXT_ERR_NOACK;
}
return SSL_TLSEXT_ERR_OK;
}
SSL_CTX_set_next_protos_advertised_cb(ctx, NextProtoCb, NULL);
SSL_CTX_set_alpn_select_cb(ctx, AlpnSelectProtoCb, NULL);
h2c升级:接收的数据里面有HTTP2-Settings的话,先回复响应头,再nghttp2_submit_settings,最后解析HTTP2-Settings进行nghttp2_session_upgrade2,就有id为1的流了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132string::iterator decode(string &settings, string::iterator d_first)
{
static int INDEX_TABLE[] = {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, 62, -1, -1, -1, 63, 52, 53, 54, 55, 56, 57,
58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24,
25, -1, -1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1};
assert(settings.size() % 4 == 0);
string::iterator p = d_first;
string::iterator last = settings.end();
string::iterator first = settings.begin();
for (; first != last;)
{
uint32_t n = 0;
for (int i = 1; i <= 4; ++i, ++first)
{
int idx = INDEX_TABLE[(size_t)(*first)];
if (idx == -1)
{
if (i <= 2)
{
return d_first;
}
if (i == 3)
{
if (*first == '=' && *(first + 1) == '=' && first + 2 == last)
{
*p++ = n >> 16;
return p;
}
return d_first;
}
if (*first == '=' && first + 1 == last)
{
*p++ = n >> 16;
*p++ = n >> 8 & 0xffu;
return p;
}
return d_first;
}
n += idx << (24 - i * 6);
}
*p++ = n >> 16;
*p++ = n >> 8 & 0xffu;
*p++ = n & 0xffu;
}
return p;
}
std::string Http2SettingsDecode(string &settings)
{
size_t len = settings.size();
if (len % 4 != 0)
{
return "";
}
std::string res;
res.resize(len / 4 * 3);
res.erase(decode(settings, res.begin()), res.end());
return res;
}
char *http2Settings = strstr((char *)buf, "HTTP2-Settings:");
if (http2Settings) {
// 设置回调
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, SendCallback);
nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, OnFrameSendCallback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, OnFrameRecvCallback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, OnStreamCloseCallback);
nghttp2_session_callbacks_set_on_header_callback(callbacks, OnHeaderCallback);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, OnBeginHeadersCallback);
nghttp2_session_callbacks_set_send_data_callback(callbacks, SendDataCallback);
nghttp2_option *option;
nghttp2_option_new(&option);
nghttp2_session_server_new2(&session, callbacks, sessionData, option);
nghttp2_option_del(option);
nghttp2_session_callbacks_del(callbacks);
string upgradeResponse = "HTTP/1.1 101 Switching Protocols\r\n"
"Connection: Upgrade\r\n"
"Upgrade: " NGHTTP2_CLEARTEXT_PROTO_VERSION_ID "\r\n"
"\r\n";
int upgradeRet = -1;
while ((upgradeRet = send(sock, upgradeResponse.c_str(), upgradeResponse.size(), 0)) == -1 && errno == EINTR);
if (upgradeRet < 0)
{
return -1;
}
nghttp2_settings_entry iv[3] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
{NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 1048576},
{NGHTTP2_SETTINGS_HEADER_TABLE_SIZE, 8192}};
int rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
if (rv != 0)
{
return -1;
}
int rv = nghttp2_session_send(session);
if (rv != 0)
{
return -1;
}
std::string settings = string(http2Settings + sizeof("HTTP2-Settings:"));
string settings_payload = Http2SettingsDecode(settings);
rv = -1;
if ((rv = nghttp2_session_upgrade2(session, (uint8_t *)settings_payload.c_str(), settings_payload.size(), 0, NULL)) != 0)
{
return -1;
}
nghttp2_stream *stream = nghttp2_session_find_stream(session, 1); // 此时有流id为1的流了
assert(stream != NULL);
}
如果是h2协议:设置回调后nghttp2_submit_settings,并nghttp2_session_send即可。
数据发送方法
使用nghttp2_submit_data进行数据提交,提交需要nghttp2_data_provider类型的数据,包括nghttp2_data_source,这是一个union对象,一个是fd、一个是ptr,一般提供ptr地址即可,另外还需要nghttp2_data_source_read_callback回调方法,也就是说告诉了地址还要告诉它每次读多少数据,是否需要设置标志,如流结束NGHTTP2_FLAG_END_STREAM,怎么从这个地址去读数据参考nghttp2_session_callbacks_set_send_data_callback(callbacks, SendDataCallback),提交是先提交到队列里面,调用nghttp2_session_mem_send时,从队列里面取数据才真正开始读数据,读出来的数据通过拷贝的方式先放sessionData->writeBuffer里面,这个过程是需要拷贝的,如果这个buffer满了就触发NGHTTP2_ERR_WOULDBLOCK, writeBuffer里面记录了这个session当前发送的位置pos,以及当前缓冲区最后的位置last,这个缓冲区相当于一个消息队列,当io检测到连接的发送缓冲区sendbuf可以发送数据时,就会从这个writeBuffer缓冲区取数据,取完多少数据pos就要加多少,数据取完后的标志为pos==last,此时要把pos和last都置为0,重新开始,这里放数据和取数据的过程session都是加锁的,因为这个writeBuffer相当于这个session的临界资源了,取到的数据通过SSL_write发送即可。如果发送数据时设置的不是NGHTTP2_DATA_FLAG_NO_COPY,那么nghttp2_session_mem_send接口还会返回数据,这些数据是序列化后的数据,这些数据也是要放入sessionData->writeBuffer里面等待网络发送的。如果序列化后的数据放在buffer中放不下(writeBuffer是有大小限制的),需要先pending起来,下次buffer里面的数据发完了再优先从pending里面取。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71// 提交数据header
uint8_t flag = NGHTTP2_FLAG_NONE;
canSendSize = hdrs->size();
if (IsNobodyReqest())
{
flag = NGHTTP2_FLAG_END_STREAM; // 无数据体的,需要发送endstream
}
int error = nghttp2_submit_headers(session,
flag, streamId, NULL,
hdrs->data(), hdrs->size(), NULL);
// 提交数据body
uint8_t flag = NGHTTP2_FLAG_NONE;
if (httpObj->fileBeginPos + canSendSize >= httpObj->fileEndPos)
{
flag = NGHTTP2_FLAG_END_STREAM; // 设置nghttp2_frame_hd,发完了,结束stream,进入半关闭或者关闭状态
}
data_prd.source.ptr = dataBuf;
data_prd.read_callback = FileReadCallback;
nghttp2_submit_data(session, flag, streamId, &data_prd);
// 发送数据
ssize_t datalen = nghttp2_session_mem_send(session, &data); // 调用send_data_callback
// FileReadCallback
int64_t nread = std::min(static_cast<int64_t>(streamData->dataLength - streamData->dataOffset),
static_cast<int64_t>(length));
*dataFlags |= NGHTTP2_DATA_FLAG_NO_COPY; // 提示后面调用send_data_callback即SendDataCallback
if (nread == 0 || streamData->bodyLength == streamData->bodyOffset + nread)
{
*dataFlags |= NGHTTP2_DATA_FLAG_EOF; // nghttp2会设置nghttp2_frame_hd的NGHTTP2_FLAG_END_STREAM,nghttp2_stream_shutdown(stream, NGHTTP2_SHUT_WR);主动关闭写端
}
// SendDataCallback,nghttp2_session_mem_send后调用,用于准备序列化的数据
if (writeBuffer.wleft() < 9 + length + padlen)
{
return NGHTTP2_ERR_WOULDBLOCK;
}
char *ptr = (char *)source->ptr;
uint8_t *p = writeBuffer.last;
p = ngCopyN(framehd, 9, p); // 固定9各字节
if (padlen) *p++ = padlen - 1;
memcpy(p, ptr + streamData->dataOffset, length);
streamData->bodyOffset += length;
streamData->dataOffset += length;
p += length;
if (padlen)
{
for (; p != p + padlen - 1; ++p)
{
*p = 0;
}
}
writeBuffer.last = p;
// OnFrameSendCallback
// 序列化完成后调用,之后就SSL_write了
case NGHTTP2_DATA:
streamData->dataSent += frame->data.hd.length;
if (DATA_PREPARED_FINISHED == streamData->status)
{
streamData->status = DATA_SEND_EOF; // 一次submit完全发完,允许下次nghttp2_submit_data
}
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
if (nghttp2_session_get_stream_remote_close(session, frame->hd.stream_id) == 0) // 如果对端没有半关闭,主动关闭该流
{
nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE, frame->hd.stream_id, NGHTTP2_NO_ERROR);
}
}
Http2StreamSignal(streamData);
case NGHTTP2_HEADERS:
streamData->dataSent = frame->headers.nvlen;
streamData->status = DATA_SEND_EOF; // header走到这里肯定是发完了,允许下次nghttp2_submit_data
所有发送帧的格式如下,都是固定9个字节开头接数据,9字节里面包括的长度、类型、标志和流id。nghttp2里每一个数据帧都包含nghttp2_frame_hd.
帧有不同的类型,数据帧格式,可以设置标志END_STREAM (0x1)、PADDED (0x8).1
2
3
4
5
6
7
8typedef struct {
nghttp2_frame_hd hd;
/**
* The length of the padding in this frame. This includes PAD_HIGH
* and PAD_LOW.
*/
size_t padlen;
} nghttp2_data;
头帧:里面含有流权重优先级,头部请求头等数据,可以设置END_STREAM (0x1)、END_HEADERS (0x4)、PADDED (0x8)、PRIORITY (0x20)标志.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27typedef struct {
/**
* The frame header.
*/
nghttp2_frame_hd hd;
/**
* The length of the padding in this frame. This includes PAD_HIGH
* and PAD_LOW.
*/
size_t padlen;
/**
* The priority specification
*/
nghttp2_priority_spec pri_spec;
/**
* The name/value pairs.
*/
nghttp2_nv *nva;
/**
* The number of name/value pairs in |nva|.
*/
size_t nvlen;
/**
* The category of this HEADERS frame.
*/
nghttp2_headers_category cat;
} nghttp2_headers;
优先级帧:可以在流的任何阶段使用,但不包含任何标志,默认是16,8位无符号整型,所以范围1 ~ 256,值越大分配的资源越多,优先级越高。1
2
3
4
5
6
7
8
9
10
11// It can be sent in any stream state, including idle or closed streams.
typedef struct {
/**
* The frame header.
*/
nghttp2_frame_hd hd;
/**
* The priority specification.
*/
nghttp2_priority_spec pri_spec;
} nghttp2_priority;
RST_STREAM帧: 32位的整数,不包含任何标志.1
2
3
4
5
6
7
8
9
10typedef struct {
/**
* The frame header.
*/
nghttp2_frame_hd hd;
/**
* The error code. See :type:`nghttp2_error_code`.
*/
uint32_t error_code;
} nghttp2_rst_stream;
当前流的数据被其它流的线程再发送时,当前流这个线程就要不断检测是否已经替它发完了,但是忙查询的话就费cpu,于是可以通过条件变量的方式来进行通知,没发完的时候阻塞等待。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35void Http2StreamSignal(SHttp2StreamData *streamData)
{
int res = 0;
pthread_mutex_lock(&streamData->mutex);
streamData->dataSync = true;
res = pthread_cond_signal(&streamData->condition);
pthread_mutex_unlock(&streamData->mutex);
}
void Http2StreamTimeWait(stream *streamData, int milliseconds)
{
int res = 0;
int mutexRes = 0;
struct timespec ts = {0};
struct timeval now;
gettimeofday(&now, NULL);
int64_t nsec = (int64_t)now.tv_usec * 1000 + (int64_t)milliseconds * 1000000;
ts.tv_sec = now.tv_sec + nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t nsec = ((int64_t)ts.tv_sec) * 1000000000 + ((int64_t)ts.tv_nsec) + ((int64_t)milliseconds) * 1000000;
ts.tv_sec = nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;
mutexRes = pthread_mutex_lock(&streamData->mutex);
if (!streamData->dataSync) // 如果已经同步了,就不要等待了
{
res = pthread_cond_timedwait(&streamData->condition, &streamData->mutex, &ts);
}
streamData->dataSync = false; // 这个变量也是临界资源
mutexRes = pthread_mutex_unlock(&streamData->mutex);
}
数据接收
收到数据后调用nghttp2_session_mem_recv进行处理,具体的可以处理的逻辑都在回调函数里面。1
2
3
4
5
6
7
8
9
10
11
12// OnBeginHeadersCallback,创建流对象
streamData = CreateHttp2StreamData(sessionData, frame->hd.stream_id);
nghttp2_session_set_stream_user_data(session, frame->hd.stream_id, streamData);
// OnHeaderCallback,解析头部数据
streamData->headers.push_back(SHeader(std::string((const char *)name, namelen), std::string((const char *)value, valuelen)));
// OnFrameRecvCallback,请求完需要处理数据
case NGHTTP2_DATA:
case NGHTTP2_HEADERS:
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
return OnRequestRecv(session, sessionData, streamData);
}
数据重发逻辑
客户端未及时接收导致sendbuf满、服务端限速需要间隔发送、stream的流控等都需要进行数据重发。
重发逻辑是一样的
sendbuf满后,需要将当前的stream保存下来,等待epollout时再重新触发数据发送
1 | // 发完的暂不做统计,触发epollout后接着发 |
服务端限速,需要将当前stream保存下来,并记录下次发送的时间,由定时事件触发下次发送
1 | if (limitSpeed > 0) |
stream的流控时,nghttp2_session_get_stream_remote_window_size/nghttp2_session_get_remote_window_size小于等于0,暂停当前发送,当epollin事件有窗口更新时即窗口大于0时,再继续发送
1 | // 更新remote_window_size后,对session下flowControlStreams的所有stream进行重发 |
主连接与流连接
主连接需要在所有流连接释放后才会得到释放,在新建流连接对象时,会接管父连接,使父连接的引用计数加一,这样只要该父连接下还有一个流连接没有释放,父连接对象的析构函数就不会触发。
共享指针原理:这里的每一个连接对象都是一个引用计数对象,都继承自这个引用计数类,该类有一个计数变量counter,初始化为1,也就是只有自己本身使用,如果有另外的变量依赖这个变量,比如引用了这个变量的指针,就需要把这个counter加一,依赖的变量不再依赖后,这个counter再减一,这里加一减一都需要原子操作sync_add_and_fetch/sync_sub_and_fetch,最后这个变量释放时counter减一后为0了,就能真正释放这个变量了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22class RefCountedObject
{
public:
RefCountedObject();
void duplicate();
void release();
long _counter;
}
// 通过RAII技术来管理
AutoPtr& assign(const AutoPtr& ptr)
{
if (&ptr != this)
{
if (_ptr) _ptr->release();
_ptr = ptr._ptr;
if (_ptr) _ptr->duplicate();
}
return *this;
}