ThreadManager实现

服务器TNonblockingServer

rpc请求生成task放入到任务队列中,这个任务是Runnable的,但没有依附于某个thread,等于是封装了个run函数,供线程池工作线程调用时运行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
void addTask(stdcxx::shared_ptr<Runnable> task) {
threadManager_->add(task, 0LL, taskExpireTime_);
}

stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>(
new Task(processor_, inputProtocol_, outputProtocol_, this));
server_->addTask(task);

// 真正的任务,其实也是一个run函数,这个Runnable是没有thread实体的
class TNonblockingServer::TConnection::Task : public Runnable {
public:
Task(stdcxx::shared_ptr<TProcessor> processor,
stdcxx::shared_ptr<TProtocol> input,
stdcxx::shared_ptr<TProtocol> output,
TConnection* connection)
: processor_(processor),
input_(input),
output_(output),
connection_(connection),
serverEventHandler_(connection_->getServerEventHandler()),
connectionContext_(connection_->getConnectionContext()) {}

void run() {
try {
for (;;) {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
if (!processor_->process(input_, output_, connectionContext_)
|| !input_->getTransport()->peek()) {
break;
}
}
} catch (const TTransportException& ttx) {
GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const std::bad_alloc&) {
GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(),
x.what());
} catch (...) {
GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}

// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) {
GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
connection_->server_->decrementActiveProcessors();
connection_->close();
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}

TConnection* getTConnection() { return connection_; }

private:
stdcxx::shared_ptr<TProcessor> processor_;
stdcxx::shared_ptr<TProtocol> input_;
stdcxx::shared_ptr<TProtocol> output_;
TConnection* connection_;
stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
void* connectionContext_;
};

线程池管理ThreadManager

线程池的管理,包括增加工作线程worker,这个worker线程是继承自Runnable的,依附于具体的线程thread,还有就是增加任务,上面说到了server是怎么调用这个增加任务的接口的。注意worker和task都是继承自runnable,一个有依附的thread,一个没有,都要实现run函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// 线程池里面的一个工作线程
class ThreadManager::Worker : public Runnable { // Worker是继承自Runnable的
enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };

public:
Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {} // 管理员也初始化进来了,可以做一些管理工作

~Worker() {}

private:
bool isActive() const {
return (manager_->workerCount_ <= manager_->workerMaxCount_)
|| (manager_->state_ == JOINING && !manager_->tasks_.empty());
}

public:

void run() {
Guard g(manager_->mutex_);
bool active = manager_->workerCount_ < manager_->workerMaxCount_;
if (active) {
if (++manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify(); // 所有工作线程启动完成通知
}
}

while (active) {
active = isActive();

while (active && manager_->tasks_.empty()) { // 工作线程从任务队列里面取任务,没有任务的时候阻塞等待
manager_->idleCount_++;
manager_->monitor_.wait(); // 等待任务
active = isActive();
manager_->idleCount_--;
}

shared_ptr<ThreadManager::Task> task;

if (active) {
if (!manager_->tasks_.empty()) {
task = manager_->tasks_.front(); // 取到任务了
manager_->tasks_.pop_front();
if (task->state_ == ThreadManager::Task::WAITING) {
task->state_ =
(task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
ThreadManager::Task::TIMEDOUT :
ThreadManager::Task::EXECUTING;
}
}

if (manager_->pendingTaskCountMax_ != 0
&& manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
manager_->maxMonitor_.notify();
}
}

if (task) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
manager_->mutex_.unlock(); // 任务执行期间可以不用加任务队列的锁

try {
task->run();
} catch (const std::exception& e) {
GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
} catch (...) {
GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
}

manager_->mutex_.lock();

} else if (manager_->expireCallback_) {
manager_->expireCallback_(task->getRunnable());
manager_->expiredCount_++;
}
}
}

manager_->deadWorkers_.insert(this->thread());
if (--manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify();
}
}

private:
ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
STATE state_;
};

// 工作线程里面的任务
class ThreadManager::Task : public Runnable { // Task也是继承自Runnable
public:
enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };

Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL) // 装饰者模式
: runnable_(runnable),
state_(WAITING),
expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}

~Task() {}

void run() {
if (state_ == EXECUTING) {
runnable_->run();
state_ = COMPLETE;
}
}

shared_ptr<Runnable> getRunnable() { return runnable_; }

int64_t getExpireTime() const { return expireTime_; }

private:
shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
STATE state_;
int64_t expireTime_;
};

// 添加工作线程
// class ThreadManager::Impl : public ThreadManager,ThreadManager是抽象基类,不能直接实例化,但提供了newSimpleThreadManager静态接口,具体由Impl实现。
void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
shared_ptr<ThreadManager::Worker> worker
= shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(threadFactory_->newThread(worker)); // 这里是工厂生产的线程,要传进去这个worker的runnable对象,这是真正的运行体
}

Guard g(mutex_);
workerMaxCount_ += value;
workers_.insert(newThreads.begin(), newThreads.end()); // 放到workers_中

for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
++ix) {
shared_ptr<ThreadManager::Worker> worker
= dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start(); // 启动工作线程,start才会真正生成线程
idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
}

while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait(); // 等待所有工作线程初始化完成
}
}

// 添加任务
// class ThreadManager::Impl : public ThreadManager,ThreadManager是抽象基类,不能直接实例化,但提供了newSimpleThreadManager静态接口,具体由Impl实现。
void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
Guard g(mutex_, timeout);

if (!g) {
throw TimedOutException();
}

if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::add ThreadManager "
"not started");
}

if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
removeExpired(true);
}

if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
if (canSleep() && timeout >= 0) {
while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
maxMonitor_.wait(timeout);
}
} else {
throw TooManyPendingTasksException();
}
}

tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));

if (idleCount_ > 0) {
monitor_.notify(); // 通知有任务加进来了
}
}

Thread类及工厂模式

我们再次从 Java 借用了线程和一个可运行的类。 线程是实际的可调度对象。 这Runnable 是在线程内执行的逻辑。 线程实现处理所有特定于平台的线程创建和销毁问题,而 Runnable 实现处理具有特定于应用程序的每线程逻辑。 这样做的好处方法是开发人员可以轻松地继承 Runnable 类无需引入特定于平台的超类。
这里,runnable弱引用了thread,因为runnable里面不一定有thread,但thread里面一定有runnable,也解决循环引用的问题。启动线程的时候,要等到真正运行到线程入口函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Thread里面包装了Runnable
class Thread {
public:
typedef std::thread::id id_t;
static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
static inline id_t get_current() { return std::this_thread::get_id(); }

virtual ~Thread(){};
virtual void start() = 0;
virtual void join() = 0;
virtual id_t getId() = 0;
virtual stdcxx::shared_ptr<Runnable> runnable() const { return _runnable; }

protected:
virtual void runnable(stdcxx::shared_ptr<Runnable> value) { _runnable = value; }

private:
stdcxx::shared_ptr<Runnable> _runnable;
};

// Runnable里面可以拿到Thread,这两个是互相引用,但是Runnable是弱引用,使用的是weak_ptr,可能返回空,也就是可能没有thread,比如task就是这样
class Runnable {
public:
virtual ~Runnable(){};
virtual void run() = 0;
virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }

private:
stdcxx::weak_ptr<Thread> thread_;
};

// 由线程工厂来生产线程
stdcxx::shared_ptr<Thread> StdThreadFactory::newThread(stdcxx::shared_ptr<Runnable> runnable) const {
stdcxx::shared_ptr<StdThread> result = stdcxx::shared_ptr<StdThread>(new StdThread(isDetached(), runnable));
runnable->thread(result);
return result;
}

// 真正的线程在这里
class StdThread : public Thread, public stdcxx::enable_shared_from_this<StdThread> {
public:
enum STATE { uninitialized, starting, started, stopping, stopped };

static void threadMain(stdcxx::shared_ptr<StdThread> thread);

private:
std::unique_ptr<std::thread> thread_;
Monitor monitor_;
STATE state_;
bool detached_;

public:
StdThread(bool detached, stdcxx::shared_ptr<Runnable> runnable)
: state_(uninitialized), detached_(detached) {
this->Thread::runnable(runnable); // 设置Thread的Runnable
}

~StdThread() {
if (!detached_ && thread_->joinable()) {
try {
join();
} catch (...) {
// We're really hosed.
}
}
}

STATE getState() const
{
Synchronized sync(monitor_);
return state_;
}

void setState(STATE newState)
{
Synchronized sync(monitor_);
state_ = newState;

// unblock start() with the knowledge that the thread has actually
// started running, which avoids a race in detached threads.
if (newState == started) {
monitor_.notify();
}
}

void start() { // 调用这里才会真正生成线程
if (getState() != uninitialized) {
return;
}

stdcxx::shared_ptr<StdThread> selfRef = shared_from_this(); // 这里要获取自身的指针
setState(starting);

Synchronized sync(monitor_);
thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef)); // 真正的生成线程的地方

if (detached_)
thread_->detach(); // 如果是detach,后面就一定需要等待了

// Wait for the thread to start and get far enough to grab everything
// that it needs from the calling context, thus absolving the caller
// from being required to hold on to runnable indefinitely.
monitor_.wait(); // 等待线程真正运行起来
}

void join() {
if (!detached_ && state_ != uninitialized) {
thread_->join();
}
}

Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }

stdcxx::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }

void runnable(stdcxx::shared_ptr<Runnable> value) { Thread::runnable(value); }
};

// 线程的入口函数
void StdThread::threadMain(stdcxx::shared_ptr<StdThread> thread) {
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
ProfilerRegisterThread();
#endif

thread->setState(started); // 线程真正运行起来了
thread->runnable()->run(); // 线程里面的运行实体,究竟是运行什么由runnable进行包装,一般是循环处理

if (thread->getState() != stopping && thread->getState() != stopped) { // 跑到这里说明循环完成了,线程也要结束了
thread->setState(stopping);
}
}

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!