线程池设计与实现


线程池在创建的时候启动一定数量的线程,这些线程所做的事情就是不断从任务队列中获取任务来执行,当已启动的线程全部并行执行任务,即所有已有线程都处于繁忙状态,且任务队列满了的时候,管理线程可以再启动一些新的线程来执行任务,已启动的线程数不能超过线程池设置的总大小,当繁忙态线程低于一定比例时,可以考虑释放掉一部分线程,但最少会有多少的线程会一直存在,这个多少的值可以在启动线程池时设置。所以有了线程池就不用一直启动再释放线程,同时可以让少于任务数个的线程来服务比较多的任务,比如线程池大小为16,任务数为50,这时线程池会先取16个任务执行,此时再来新的任务会被存到任务队列中,相当于一个缓存池的作用,可以看出用16个线程就可以服务50多个任务了。

这里面的任务队列会被线程池里面的线程读取,看是否有任务存在,所以任务队列对于线程池来说是一种临界资源,访问的时候就要加锁了。

线程是怎么判断任务队列是否有任务到来呢?如果用while循环判断任务队列的大小,就会造成cpu的浪费,所以这里一般用条件变量来实现,条件变量要结合互斥锁使用,wait之前上锁,wait时自动解锁,被唤醒后如果条件满足会继续持有锁,这里wait用来等待信号,比如有新的任务到来时可以发起一个信号,wait时会进入阻塞状态,不会消耗cpu,被唤醒后再检查条件,如果确实有任务了,线程就可以取任务进行执行了,这里取任务是在上面的互斥锁保护下进行的,所以不会有多取的情况。

这里线程池的线程从任务队列里面消费任务,而用户则可以向任务队列里面生产任务,这就是一个生产者消费者模型,java里面一般将这个任务队列设计成阻塞队列,没有任务但是线程去取的时候就会阻塞,如果阻塞队列还是有大小限制的,如是用数组实现的,则队列满了的时候用户就阻塞了,放不进去任务,当然可以设计成别的策略,如直接抛弃想要加进去的任务,这样线程池和阻塞任务队列隔离实现了,不再耦合在一起,也就是将条件变量的wait和signal放在任务队列里面去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>

using namespace std;

queue<int> q;
mutex mtx;
condition_variable cv;

void Produce() {
for (int i = 0; i < 10; ++i) {
unique_lock<mutex> ul(mtx);
q.push(i);
cout << "Producer produced " << i << endl;
ul.unlock();
cv.notify_all();
this_thread::sleep_for(chrono::seconds(1));
}
}

void Consume() {
int data = 0;
while (data != 9) {
unique_lock<mutex> ul(mtx);
cv.wait(ul, [] { return !q.empty(); });
data = q.front();
q.pop();
cout << "Consumer consumed " << data << endl;
ul.unlock();
this_thread::sleep_for(chrono::seconds(2));
}
}

int main() {
thread t1(Produce);
thread t2(Consume);
t1.join();
t2.join();
return 0;
}

一个项目里面可以使用多个线程池,一个线程池对应一个任务队列,比如io密集型的任务放在同一个线程池里,cpu密集型的任务放在另外一个线程池里,这样即使io密集型的任务全部阻塞,也还可以有cpu密集型的任务得到运行,起到隔离的作用。

当任务的执行时间比较短,线程池对应的任务队列里面的条件变量互斥锁就可能成为性能瓶颈,因为会有多线程高并发读写任务队列,这个时候可以考虑用cas进行无锁优化。另外任务队列也可以进一步优化,加入任务优先级的功能,优先级高的任务优先得到取出,优先让线程池里面的线程处理。

开发中用多线程还是多进程?
考虑健壮性,可以使用多进程,例如nginx分为一个master进程和多个worker进程,多个进程间是相互独立的。如果多个模块间需要交互通信,采用多线程,如redis主线程和io线程之间需要协作,采用多io线程。线程崩溃,整个进程都会崩溃,所以考虑健壮性,一般加一个守护进程。

单线程的协程没法充分发挥多核的优势,一般偏网络io的服务器可以考虑使用协程,协程的调度是在用户态完成的,切换速度快。另外还有阻塞问题,某个用户线被阻塞,导致进程同样被阻塞。

C++11实现一个线程池?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool ThreadPool::start()
{
std::unique_lock<std::mutex> lock(_mutex);
if (!threads.empty()) return false;
for (size_t i = 0; i < _threadNum; i++)
{
_threads.push_back(new thread(&ThreadPool::run, this)); // 将this传进去
}
return true;
}

void ThreadPool::stop()
{
{
std::unique_lock<std::mutex> lock(_mutex);
_bTerminate = true
_condition.notify_all(); // 通知线程退出
}
for (size_t i = 0; i < _threadNum; i++)
{
if (_threads[i]->joinable())
_thread[i].join();
delete _thread[i];
_thread[i] = nullptr;
}_
std::unique_lock<std::mutex> lock(_mutex);
_threads.clear();
}

任务怎么封装,支持多参数传递,可以获取任务返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<class F, class ...Args>
auto exec(F&& f, Args&&... arg) -> std::future<decltype(f(arg...))>
{
using RetType = decltype(f(arg...));
// 使用智能指针,没地方使用了会自动释放
auto task = std::make_shared<std::packaged_task<RetType>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
TaskFunPtr tPtr = std::make_shared<TaskFunc>();
tPtr->_func = [task]() {
printf("do task\n");
(*task)();
}
std::unique_lock<std::mutex> lock(_mutex);
tasks.push(tPtr); // 存入的是指针
contition.notify_one(); // 有任务进行通知

return task->get_future();
}

读取任务后,任务运行前后需要使用原子变量计数,有的任务会执行比较久。
`c++
void ThreadPool::get(TaskFunPtr &task)
{
std::unique_ptrstd::mutex lock(mutex);
while (_tasks.empty()) // 预防虚假唤醒
{
_condition.wait(lock, this {
return _bTerminate || !_tasks.empty();
})
}
task = std::move(_tasks.front()); // 资源直接转移
tasks.pop();
}

void ThreadPool::run()
{
while(!_bTerminate)
{
TaskFunPtr task;
get(task);
++_atomic;
try {
task->_func();
} catch(…) {
}
–_atomic;
std::unique_lockstd::mutex lock(_mutex);
if (_atomic == 0 && _tasks.empty()) // 要确保任务已经执行完了
{
_condition.notify_all();
}
}
}

bool ThreadPool::waitForAllDone()
{
std::unique_lockstd::mutex lock(_mutex);
if (_tasks.empty())
return true;
_condition.wait(lock, [this] { return _tasks.empty(); });
return true;
}
`;

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!