boost asio使用

1
https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference.html

知识点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# https://sourceforge.net/projects/boost/
# https://www.boost.org/users/history/
#boost 1.60
./bootstrap.sh --prefix=$(pwd)/app --with-libraries=atomic,system,filesystem,chrono,thread,date_time,exception,iostreams
vim project-config.jam
# using gcc : arm64 : /root/wxz_test/sdk_package/toolchain/bin/aarch64-openwrt-linux-musl-g++ ;
./b2 cxxflags="-std=c++0x -fPIC" cflags="-fPIC" -j7
./b2 install
# or
./b2 -a --with-program_options --with-atomic --with-date_time --with-system --with-thread --with-chrono --with-filesystem --with-serialization cxxflags="-std=c++0x -fPIC" cflags="-fPIC" -j7 install

# nm libboost_system.a | c++filt | grep "boost"

# 有的平台使用时,需要加这几个头,否则报错undefined reference to boost::system::system_category()
# 如果编译没有报错,则不要轻易加,会导致boost asio async_connect一直报错Operation already in progress
-DBOOST_ERROR_CODE_HEADER_ONLY -DBOOST_SYSTEM_NO_DEPRECATED -DBOOST_CHRONO_HEADER_ONLY

# /root/wxz_test/sdk_package/toolchain/bin/aarch64-openwrt-linux-g++ boosttest.cpp -o boosttest -std=c++11 -I/root/wxz_test/boost_1_60_0/app/include -L /root/wxz_test/boost_1_60_0/app/lib/ -l:libboost_system.a -l:libboost_thread.a -lpthread

ibevent是基于Reactor(反应器模式)。boost.asio是基于Proactor(主动器模式)。
io_context类似于reactor或iocp,有两个重要的命名空间boost::asio和boost::asio::ip
boost::asio下的io函数有connect、accept、read_some、write_some,异步io加个async即可。
boost::asio::ip主要有ip地址ip::address,端点ip::tcp::endpoint\ip::udp::address,sockect:ip::tcp::socket\ip::udp::socket.
套接字控制:set_option\get_option\io_control

1
2
3
// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);

cmake构建boost asio,boost::system、boost::regex、boost::read_until、boost::async_read_until,其中system在async_read_some(buffer(data, length), )中有用到

1
2
3
4
5
6
7
8
set(BOOST_ROOT $ENV{BOOST_ROOT})
set(BOOST_INCLUDE_DIR ${BOOST_ROOT})
set(BOOST_LIBRARY_DIR ${BOOST_ROOT}/lib64-msvc-14.2)
find_package(Boost 1.82 REQUIRED system regex)
link_directories(${BOOST_LIBRARY_DIR})
add_executable(asio "asio.cpp" "asio.h")
target_link_libraries(asio PUBLIC Boost::system Boost::regex)
target_include_directories(asio PUBLIC ${BOOST_INCLUDE_DIR})

asio::async_read:只有读满512字节或出错才会调用readHandler,是全局函数。async_read()内部是由多个asyn_read_some()函数来实现。
socket.async_read_some:只要有消息或出错就会调用readHandler,是类成员函数,socket.async_receive底层都是调用的async_receive。
read_until和write_until用于读取直到某个条件满足为止,它接受的参数不再是buffers,而是boost::asio:: streambuf。
read_at既可以接收buffer对象又可以接收streambuffer对象。*_at方法,这些方法用来在一个流上面做随机存取操作。
值得注意的是async_read_until(socket, streambuf, condition, handler)函数,给我们处理数据分段带来了极大的方便。它内建了4种条件决定read到什么时候停止 :出现了某个字符、出现了某条字串、符合了某条 Regular Expression、符合了某条谓词的条件。注意:这里的”出现”指的是streambuf的input sequence中出现,也就是如果原本streambuf中的内容已经符合条件,则async_read_until将立即呼叫回调。
receive_from、send_to:从一个指定的端点获取数据并写入到给定的缓冲区、把缓冲区数据发送到一个指定的端点。
endpoint(protocol, port):这个方法通常用来创建可以接受新连接的服务器端socket。
endpoint(addr, port):这个方法创建了一个连接到某个地址和端口的端点。
local_endpoint():这个方法返回套接字本地连接的地址。
remote_endpoint():这个方法返回套接字连接到的远程地址。

ip::socket_type::socket::message_peek:这个标记只监测并返回某个消息,但是下一次读消息的调用会重新读取这个消息。sock.receive(buffer(buff), ip::tcp::socket::message_peek);

1
2
3
ip::tcp::socket s1(service), s2(service);
s1 = s2; // 编译时报错
ip::tcp::socket s3(s1); // 编译时报错

1
2
3
4
5
6
// 阻塞方式
service.run(); // 或者
while (!service.stopped()) service.run_once(); // 每次最多执行一个处理程序
// poll()方法会以非阻塞的方式运行所有等待的操作。
service.poll(); // 或者
while (service.poll_one()) ; // 每次执行一个就绪处理程序

有时候你会想让一些异步处理方法顺序执行。比如,你去一个餐馆(go_to_restaurant),下单(order),然后吃(eat)。你需要先去餐馆,然后下单,最后吃。这样的话,你需要用到io_service::strand,这个方法会让你的异步方法被顺序调用。

1
2
3
4
5
io_service::strand strand_one(service), strand_two(service);
for (int i = 0; i < 5; ++i)
service.post(strand_one.wrap(boost::bind(func, i))); // // 调用 wrapped_handler,会自动将被包装的 handler 传递给 strand 的 dispatch 函数
for (int i = 5; i < 10; ++i)
service.post(strand_two.wrap(boost::bind(func, i)));

尽管方法是顺序调用的,但是不意味着它们都在同一个线程执行,每个strand内部是顺序执行的,但不同strand之间是可以并行执行的

1
2
3
4
5
6
7
8
9
10
func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138

dispatch()有可能会在返回之前, 在内部直接调用hanlder,而post()每次都立即返回了。

服务器开发流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#define HEAD_LENGTH 2
#define MAX_LENGTH 2 * 1024

class MsgNode
{
friend class CSession; // 需要访问私有成员
public:
// 收包构造函数,粘包处理
MsgNode(char * msg, short max_len):total_len_(max_len + HEAD_LENGTH), cur_len_(0){
data_ = new char[total_len_+1]();
memcpy(data_, &max_len, HEAD_LENGTH);
memcpy(data_ + HEAD_LENGTH, msg, max_len);
data_[total_len_] = '\0';
}
// 发包构造函数
MsgNode(short max_len):total_len_(max_len), cur_len_(0) {
data_ = new char[total_len_ +1]();
}
~MsgNode() {
delete[] data_;
}
void clear() {
::memset(data_, 0, total_len_);
cur_len_ = 0;
}
private:
short cur_len_;
short total_len_;
char* data_;
};

class Session : public enable_shared_from_this<Session> {
public:
Session(io_context &io_ctx, Server *server) : socket_(io_ctx), server_(server) {
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
uuid_ = boost::uuids::to_string(a_uuid);
memset(read_buffer_, 0, sizeof(read_buffer_));
}
Session(tcp::socket sock, Server *server) : socket_(std::move(sock)), server_(server) {
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
uuid_ = boost::uuids::to_string(a_uuid);
memset(read_buffer_, 0, sizeof(read_buffer_));
}
~Session() {
boost::system::error_code err;
socket_.close();
}
void start() {
do_read();
}
tcp::socket& getsocket_() {
return socket_;
}
std::string get_session_id() {
return uuid_;
}
private:
void do_close() {
server_->clear_session(uuid_); // 去掉这个,如果其它回调函数有用到,也不会释放内存
}
void do_read() {
auto self(shared_from_this()); // 保证原对象不会被释放
socket_.async_read_some(buffer(read_buffer_, MAX_LENGTH), std::bind(&Session::handle_read, this, std::placeholder::_1, std::placeholder::_2, self));
}
// 测试粘包时,服务器的接收频率需要小于客户端
void handle_read(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<Session> self) {
if (!error) {
// 已经移动的字符数
int copy_len = 0;
while (bytes_transferred > 0) {
if (!b_head_parse_) {
recv_msg_node_ = make_shared<MsgNode>(HEAD_LENGTH);
// 收到的数据不足头部大小
if (bytes_transferred + recv_head_node_->cur_len_ < HEAD_LENGTH) {
memcpy(recv_head_node_->data_ + recv_head_node_->cur_len_, read_buffer_+ copy_len, bytes_transferred);
recv_head_node_->cur_len_ += bytes_transferred;
::memset(read_buffer_, 0, MAX_LENGTH);
socket_.async_read_some(boost::asio::buffer(read_buffer_, MAX_LENGTH),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self));
return;
}
// 收到的数据比头部多
// 头部剩余未复制的长度
int head_remain = HEAD_LENGTH - recv_head_node_->cur_len_;
memcpy(recv_head_node_->data_ + recv_head_node_->cur_len_, read_buffer_ + copy_len, head_remain);
// 更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
// 获取头部数据
short data_len = 0;
memcpy(&data_len, recv_head_node_->data_, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;
// 头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
do_close();
return;
}
recv_msg_node_ = make_shared<MsgNode>(data_len);
// 头部处理完成
b_head_parse_ = true;
}
// 已经处理完头部,处理上次未接受完的消息数据
// 接收的数据仍不足剩余未处理的
int remain_msg = recv_msg_node_->_total_len - recv_msg_node_->cur_len_;
if (bytes_transferred < remain_msg) {
memcpy(recv_msg_node_->data_ + recv_msg_node_->cur_len_, read_buffer_ + copy_len, bytes_transferred);
recv_msg_node_->cur_len_ += bytes_transferred;
::memset(read_buffer_, 0, MAX_LENGTH);
socket_.async_read_some(boost::asio::buffer(read_buffer_, MAX_LENGTH),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self));
return;
}
// 接收完整数据
memcpy(recv_msg_node_->data_ + recv_msg_node_->cur_len_, read_buffer_ + copy_len, remain_msg);
recv_msg_node_->cur_len_ += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
recv_msg_node_->data_[recv_msg_node_->_total_len] = '\0';
cout << "receive data is " << recv_msg_node_->data_ << endl;
// 此处可以调用do_write发送测试
do_write(recv_msg_node_->data_, recv_msg_node_->_total_len);
// 继续轮询剩余未处理数据
b_head_parse_ = false;
if (bytes_transferred <= 0) {
::memset(read_buffer_, 0, MAX_LENGTH);
socket_.async_read_some(boost::asio::buffer(read_buffer_, MAX_LENGTH),
std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, self));
return;
}
continue;
}
} else {
std::cout << "handle read failed, error is " << error.what() << endl;
do_close();
}
}
void do_write(char* msg, int len) {
bool pending = false;
std::lock_guard<std::mutex> lock(send_lock_);
if (send_que_.size() > 0) {
pending = true;
}
send_que_.push(make_shared<MsgNode>(msg, len));
if (pending) { // 上次的还没发完
return;
}
auto self(shared_from_this()); // 保证原对象不会被释放
boost::asio::async_write(socket_, boost::asio::buffer(msg, len),
std::bind(&Session::handle_write, this, std::placeholders::_1, self));
}
void handle_write(const boost::asio::system::error_code& ec, std::shared_ptr<Session> self) {
if (ec) {
do_close();
return;
}
std::lock_guard<std::mutex> lock(send_lock_); // 如果是多线程模式,回调线程可能不是同一个线程,需要加锁
send_que_.pop(); // 已经发完的进行移除
if (!send_que_.empty()) {
auto &msgnode = send_que_.front();
boost::asio::async_write(socket_, boost::asio::buffer(msgnode->data_, msgnode->total_len_), std::bind(&Session::handle_write, this, std::placeholders::_1, self));
}
}
private:
tcp::socket socket_;
char read_buffer_[MAX_LENGTH];
std::string uuid_;
Server *server_;
std::queue<std::shared_ptr<MsgNode>> send_que_;
std::shared_ptr<MsgNode> recv_head_node_;
std::shared_ptr<MsgNode> recv_msg_node_;
std::mutex send_lock_;
};

class Server {
public:
Server(io_context &io_ctx, short port) : io_ctx_(io_ctx), accept_(io_ctx, tcp::endpoint(tcp::ipv4(), port)) {
std::cout << "server start successful at port " << port << std::endl;
do_accept();
}
void clear_session(const std::string &session_id) {
auto it = sessions_.find(session_id);
if (it != sessions_.end()) {
sessions.delete(it);
}
}
private:
void do_accept() {
// accept_.async_accept([this](boost::system::error_code err, tcp::socket sock) {
// if (!err) {
// std::make_shared<Session>(sock, this)->start();
// }
// do_accept();
// })
// 或者下面的写法
auto sessionPtr = std::make_shared<Session>(io_ctx_, this);
accept_.async_accept(sessionPtr->getsocket_(), std::bind(&Server::handle_accept, this, sessionPtr, placeholders::_1));
}
void handle_accept(std::shared_ptr<Session> sessionPtr, const boost::system::error_code& ec) {
if (!ec) {
sessions_.insert(make_pair(sessionPtr->get_session_id(), sessionPtr)); // 第一层引用
sessionPtr->start();
}
do_accept(); // 继续处理accept
}
private:
tcp::acceptor accept_;
io_context &io_ctx_;
std::map<std::string, std::shared_ptr<Session>> sessions_;
};

多线程模型

第一种方式,启动多个(std::thread::hardware_concurrency())线程,每个线程跑一个io_context,同一个socket的回调函数都是在同一个线程里,io层面的并发是安全的。不同io_context的多线程问题可以使用逻辑队列处理,单独的线程处理逻辑队列,所以逻辑队列的入队出队是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
using Work = boost::asio::io_context::work;
using WorkPtr = std::unique_ptr<Work>;

AsioServicePool::AsioServicePool(int size) {
for (int i = 0; i < size; ++i) {
works_[i] = std::unique_ptr<Work>(new Work(ioServices_[i]));
threads_.emplace_back([i, this]() {
ioServices_[i].run();
})
}
}

void AsioServicePool::stop() {
for (auto &work : works_) {
work.reset(); // 让线程退出
}
for (auto &thread : threads_) {
thread.join(); // 等待线程退出
}
}

void Server::startAccept() {
auto &io_context = AsioServicePool::getInstance()->getIoService(); // io采用其他线程处理
std::shared_ptr<Session> session = std::make_shared<Session>(io_context, this);
accept_.async_accept(session->getSocket(), std::bind(&Server::handleAccept, this, session, placeholders::_1));
}

另一种方式是,同一个io_context同时跑在多个线程里,这种回调函数会被并发调用,io层面并不是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
AsioThreadPool::AsioThreadPool(int threadNum) :
work_(new boost::asio::io_context::work(service_)) {
for (int i = 0; i < threadNum; i++) {
threads_.emplace_back([this]() {
service_.run();
});
}
}
socket_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
boost:asio::bind_executor(strand, std::bind(&Session::handleRead, this, std::placeholders::_1, std::placeholders::_2)));

void main(int argc, char **argv)
{
auto pool = AsioThreadPool::getInstance();
boost::asio::io_context io_context;
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&io_context](auto, auto) {
io_context.stop();
pool->stop();
std::unique_lock<std::mutex> lock(mutex_quit); // 可能在别的线程触发
bstop = true;
cond_quit.notify_one();
});
Server s(pool->getIoService(), 10086);
{
std::unique_lock<std::mutex> lock(mutex_quit);
while (!bstop) {
cond_quit.wait(lock); // 防止主线程退出
}
}
}

协程使用

c++20才支持协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio:co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;

awaitable<void> echo(tcp::socket socket) {
try {
char data[1024];
for (;;) {
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable); // 不再需要阻塞等待,可以以同步的方式来实现异步的代码
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
} catch (std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
}
}

awaitable<void> listener() {
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 10086});
for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket), detached));
}
}

int main() {
try {
boost::asio::io_context io_context(1);
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) {
io_context.stop();
});
co_spawn(io_context, listener(), detached);
io_context.run();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
}

// 客户端实现
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH);
size_t request_len = strlen(request);
boost::asio::write(sock, boost::asio::buffer(request, request_len));

char reply[MAX_LENGTH];
size_t reply_len = boost::asio::read(sock, boost::asio::buffer(reply, request_len));

客户端例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
void TestHttps(const char *host) {
boost::asio::io_service io;
ip::tcp::resolver resolver(io); // handle DNS ?
ip::tcp::resolver::query query(host, "443");
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> sock(io, ctx);
boost::asio::streambuf response;
auto handleRcv = [&](const boost::system::error_code &ec5, std::size_t bytes_transferred) {
printf("recv http %s,bytes_transferred:%d", ec5.message().c_str(), bytes_transferred);
if (bytes_transferred > 0) {
printf("recv http content:%s", response.data().c_str());
}
};
auto handleWrite = [&](const boost::system::error_code &ec4, std::size_t bytes_transferred) {
printf("send http %s,bytes_transferred:%d", ec4.message().c_str(),bytes_transferred);
if (bytes_transferred > 0) {
//接收http返回
boost::asio::async_read(sock, response, handleRcv);
}
};
auto handleShake = [&](const boost::system::error_code &ec3) {
printf("ssl handshake %s", ec3.message().c_str());
if (!ec3) {
//发送http请求
string req = "GET / HTTP/1.1\r\nAccept: */*\r\nHost: " + query.host_name() + "\r\n\r\n";
printf("send http request \n%s", req.c_str());
boost::asio::async_write(sock, boost::asio::buffer(req.c_str(), req.size()), handleWrite);
}
};
auto handleConnect = [&](const boost::system::error_code &ec2, ip::tcp::resolver::iterator iter2) {
printf("connect %s %s", iter2->endpoint().address().to_string().c_str(), ec2.message().c_str());
if (!ec2) {
//ssl建连
sock.async_handshake(boost::asio::ssl::stream_base::client, handleShake);
}
};
response.prepare(100);
// 如果resolver被cancel了,但是被cancel时事件已经触发了,此时回调函数的ec为result
resolver.async_resolve(query, [&](const boost::system::error_code &ec, ip::tcp::resolver::iterator iter) {
printf("resolve %s", ec.message().c_str());
if (iter != ip::tcp::resolver::iterator()) {
/* code */
async_connect(sock.lowest_layer(), iter, handleConnect);
}
while (iter != ip::tcp::resolver::iterator()) {
printf("resolve %s\n", iter->endpoint().address().to_string().c_str());
++iter;
}
});
io.run();
printf("io_service release\n");
}

// 关键要点
// 发送header
boost::asio::async_write(*http->httpsSocket, boost::asio::buffer(http->httpParser->request), boost::asio::transfer_all(), boost::bind(&Asio::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, http));
// 发送body
boost::asio::async_write(*evInfo->httpSocket, boost::asio::buffer(evInfo->httpParser->postData), boost::asio::transfer_all(), boost::bind(&Asio::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, evInfo));
// 接收响应
boost::asio::async_read_until(*evInfo->httpSocket, evInfo->responseBuf, "\r\n\r\n", boost::bind(&Asio::OnRecvHeader, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, evInfo)); // 保证接受完响应头再回调
// 解析响应头
evInfo->httpParser->ParseResponseHeader(evInfo->responseBuf);
// 接收数据,有数据就收
boost::asio::async_read(*evInfo->httpSocket, evInfo->responseBuf, boost::asio::transfer_at_least(1),
boost::bind(&Asio::OnRecvBody, this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred, evInfo));
// auto &response = evInfo->httpParser->response;
// if (response.contentLength > 0 && response.contentPos >= response.contentLength) 是否收完的条件
if (STEP_FINISHED == evInfo->step || errorOrCancel ||
((evInfo->callbackTiming & CALLBACK_TIMING_BODY) != 0 && evInfo->responseBuf.size() > g_cloudCfg.http.minCallbackSize)) {
const int64_t size = evInfo->responseBuf.size();
std::istream bodyStream(&evInfo->responseBuf); // 取数据
vector<char> vec;
vec.resize(size);
bodyStream.read(vec.data(), size);
// 将数据读到body里
if (response.body.empty()) {
response.body.swap(vec);
} else {
response.body.insert(response.body.end(), vec.begin(), vec.end());
}
printf("Asio::OnRecvBody {%lld} body size = %ld", evInfo->id, response.body.size());
}
if (evInfo->step != STEP_FINISHED) { // 如果没完成且没错误,继续接收body
boost::asio::async_read(*evInfo->httpSocket, evInfo->responseBuf, boost::asio::transfer_at_least(1),
boost::bind(&Asio::OnRecvBody, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, evInfo));
}

// kcp 相关
// 定时器,定时调用evInfo->KcpUpdate();
kcp->kcpTimer = unique_ptr<boost::asio::deadline_timer>(new boost::asio::deadline_timer(m_io, boost::posix_time::milliseconds(0)));
kcp->kcpTimer->async_wait(boost::bind(&Asio::OnKcpTimer, this, boost::asio::placeholders::error, kcp));

// 发送一个字节在本地路由器留下打洞标记
evInfo->socket->async_send_to(boost::asio::buffer("0", 1), evInfo->ecnNode,
boost::bind(&OnNullHandler, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

// 向穿透服务器发送穿透请求
evInfo->socket->async_send_to(boost::asio::buffer(buf, ntohs(evInfo->stunReq.head.len)),
evInfo->stunServer, boost::bind(&Asio::OnKcpSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, evInfo));

// 接收响应
kcp->socket->async_receive_from(boost::asio::buffer(kcp->recvBuf), kcp->recvEndpoint, // 这里没有设接收条件
boost::bind(&Asio::OnKcpRecv, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, kcp));

if (evInfo->recvEndpoint == evInfo->stunServer) {
// 穿透服务器的响应
} else if (1 == bytes_transferred) {
// 收到ecn节点的1字节打洞包,可以进行kcp建连了
} else if (bytes_transferred >= IKCP_MINHEAD) {
// kcp数据
ikcp_input(ikcp, evInfo->recvBuf, bytes_transferred); // 将数据放到应用层队列
const int status = ikcp_status(ikcp);
if (status == 1) {
// 根据http状态机处理,先调用ikcp_send发送头,发送body,再ikcp_recv接收数据
auto &response = evInfo->httpParser->response;
// 解析头
evInfo->httpParser->ParseResponseHeader(string(evInfo->recvBuf, ret))
// 接收body
response.contentPos += ret;
response.body.insert(response.body.end(), evInfo->recvBuf + bodyPos, evInfo->recvBuf + bodyPos + ret);
// 是否完成
if (response.contentPos >= response.contentLength)
}
}
// 继续接收数据
evInfo->socket->async_receive_from(boost::asio::buffer(evInfo->recvBuf), evInfo->recvEndpoint,
boost::bind(&Asio::OnKcpRecv, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, evInfo));

// 最终使用方法
m_asio->AddEvent(nextHttp);
nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!