1 | https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference.html |
知识点
1 | # https://sourceforge.net/projects/boost/ |
ibevent是基于Reactor(反应器模式)。boost.asio是基于Proactor(主动器模式)。
io_context类似于reactor或iocp,有两个重要的命名空间boost::asio和boost::asio::ip
boost::asio下的io函数有connect、accept、read_some、write_some,异步io加个async即可。
boost::asio::ip主要有ip地址ip::address,端点ip::tcp::endpoint\ip::udp::address,sockect:ip::tcp::socket\ip::udp::socket.
套接字控制:set_option\get_option\io_control1
2
3// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
cmake构建boost asio,boost::system、boost::regex、boost::read_until、boost::async_read_until,其中system在async_read_some(buffer(data, length), )中有用到1
2
3
4
5
6
7
8set(BOOST_ROOT $ENV{BOOST_ROOT})
set(BOOST_INCLUDE_DIR ${BOOST_ROOT})
set(BOOST_LIBRARY_DIR ${BOOST_ROOT}/lib64-msvc-14.2)
find_package(Boost 1.82 REQUIRED system regex)
link_directories(${BOOST_LIBRARY_DIR})
add_executable(asio "asio.cpp" "asio.h")
target_link_libraries(asio PUBLIC Boost::system Boost::regex)
target_include_directories(asio PUBLIC ${BOOST_INCLUDE_DIR})
asio::async_read:只有读满512字节或出错才会调用readHandler,是全局函数。async_read()内部是由多个asyn_read_some()函数来实现。
socket.async_read_some:只要有消息或出错就会调用readHandler,是类成员函数,socket.async_receive底层都是调用的async_receive。
read_until和write_until用于读取直到某个条件满足为止,它接受的参数不再是buffers,而是boost::asio:: streambuf。
read_at既可以接收buffer对象又可以接收streambuffer对象。*_at方法,这些方法用来在一个流上面做随机存取操作。
值得注意的是async_read_until(socket, streambuf, condition, handler)函数,给我们处理数据分段带来了极大的方便。它内建了4种条件决定read到什么时候停止 :出现了某个字符、出现了某条字串、符合了某条 Regular Expression、符合了某条谓词的条件。注意:这里的”出现”指的是streambuf的input sequence中出现,也就是如果原本streambuf中的内容已经符合条件,则async_read_until将立即呼叫回调。
receive_from、send_to:从一个指定的端点获取数据并写入到给定的缓冲区、把缓冲区数据发送到一个指定的端点。
endpoint(protocol, port):这个方法通常用来创建可以接受新连接的服务器端socket。
endpoint(addr, port):这个方法创建了一个连接到某个地址和端口的端点。
local_endpoint():这个方法返回套接字本地连接的地址。
remote_endpoint():这个方法返回套接字连接到的远程地址。
ip::socket_type::socket::message_peek:这个标记只监测并返回某个消息,但是下一次读消息的调用会重新读取这个消息。sock.receive(buffer(buff), ip::tcp::socket::message_peek);1
2
3ip::tcp::socket s1(service), s2(service);
s1 = s2; // 编译时报错
ip::tcp::socket s3(s1); // 编译时报错
1 | // 阻塞方式 |
有时候你会想让一些异步处理方法顺序执行。比如,你去一个餐馆(go_to_restaurant),下单(order),然后吃(eat)。你需要先去餐馆,然后下单,最后吃。这样的话,你需要用到io_service::strand,这个方法会让你的异步方法被顺序调用。1
2
3
4
5io_service::strand strand_one(service), strand_two(service);
for (int i = 0; i < 5; ++i)
service.post(strand_one.wrap(boost::bind(func, i))); // // 调用 wrapped_handler,会自动将被包装的 handler 传递给 strand 的 dispatch 函数
for (int i = 5; i < 10; ++i)
service.post(strand_two.wrap(boost::bind(func, i)));
尽管方法是顺序调用的,但是不意味着它们都在同一个线程执行,每个strand内部是顺序执行的,但不同strand之间是可以并行执行的1
2
3
4
5
6
7
8
9
10func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138
dispatch()有可能会在返回之前, 在内部直接调用hanlder,而post()每次都立即返回了。
服务器开发流程
1 |
|
多线程模型
第一种方式,启动多个(std::thread::hardware_concurrency())线程,每个线程跑一个io_context,同一个socket的回调函数都是在同一个线程里,io层面的并发是安全的。不同io_context的多线程问题可以使用逻辑队列处理,单独的线程处理逻辑队列,所以逻辑队列的入队出队是要加锁的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26using Work = boost::asio::io_context::work;
using WorkPtr = std::unique_ptr<Work>;
AsioServicePool::AsioServicePool(int size) {
for (int i = 0; i < size; ++i) {
works_[i] = std::unique_ptr<Work>(new Work(ioServices_[i]));
threads_.emplace_back([i, this]() {
ioServices_[i].run();
})
}
}
void AsioServicePool::stop() {
for (auto &work : works_) {
work.reset(); // 让线程退出
}
for (auto &thread : threads_) {
thread.join(); // 等待线程退出
}
}
void Server::startAccept() {
auto &io_context = AsioServicePool::getInstance()->getIoService(); // io采用其他线程处理
std::shared_ptr<Session> session = std::make_shared<Session>(io_context, this);
accept_.async_accept(session->getSocket(), std::bind(&Server::handleAccept, this, session, placeholders::_1));
}
另一种方式是,同一个io_context同时跑在多个线程里,这种回调函数会被并发调用,io层面并不是线程安全的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31AsioThreadPool::AsioThreadPool(int threadNum) :
work_(new boost::asio::io_context::work(service_)) {
for (int i = 0; i < threadNum; i++) {
threads_.emplace_back([this]() {
service_.run();
});
}
}
socket_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
boost:asio::bind_executor(strand, std::bind(&Session::handleRead, this, std::placeholders::_1, std::placeholders::_2)));
void main(int argc, char **argv)
{
auto pool = AsioThreadPool::getInstance();
boost::asio::io_context io_context;
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&io_context](auto, auto) {
io_context.stop();
pool->stop();
std::unique_lock<std::mutex> lock(mutex_quit); // 可能在别的线程触发
bstop = true;
cond_quit.notify_one();
});
Server s(pool->getIoService(), 10086);
{
std::unique_lock<std::mutex> lock(mutex_quit);
while (!bstop) {
cond_quit.wait(lock); // 防止主线程退出
}
}
}
协程使用
c++20才支持协程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio:co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;
awaitable<void> echo(tcp::socket socket) {
try {
char data[1024];
for (;;) {
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable); // 不再需要阻塞等待,可以以同步的方式来实现异步的代码
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
} catch (std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
}
}
awaitable<void> listener() {
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 10086});
for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket), detached));
}
}
int main() {
try {
boost::asio::io_context io_context(1);
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) {
io_context.stop();
});
co_spawn(io_context, listener(), detached);
io_context.run();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
}
// 客户端实现
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH);
size_t request_len = strlen(request);
boost::asio::write(sock, boost::asio::buffer(request, request_len));
char reply[MAX_LENGTH];
size_t reply_len = boost::asio::read(sock, boost::asio::buffer(reply, request_len));
客户端例子
1 | void TestHttps(const char *host) { |