+——————————————-+
| Server |
| (single-threaded, event-driven etc) |
+——————————————-+
| Processor |
| (compiler generated) |
+——————————————-+
| Protocol |
| (JSON, compact etc) |
+——————————————-+
| Transport |
| (raw TCP, HTTP etc) |
+——————————————-+
Transport
Transport传输层为从网络读取/向网络写入提供了一个简单的抽象。这使 Thrift 能够将底层传输与系统的其余部分分离(例如,序列化/反序列化)
Transport接口:
- open
- close
- read
- write
- flush
除了上面的 Transport 接口之外,Thrift 还使用了一个 ServerTransport 接口,用于接受或创建原始传输对象。顾名思义,ServerTransport 主要用于服务器端,为传入连接创建新的传输对象
- open
- listen
- accept
- close
1 | // transport的基本功能,open和close都需要实现,没有实现的话会抛异常 |
Protocol
Protocol协议抽象定义一种将内存中的数据结构映射到线路格式的机制。换句话说,协议指定了数据类型如何使用底层传输来编码/解码自身。因此,协议实现管理编码方案并负责(反)序列化过程。一些示例协议包括JSON、XML、纯文本、compact binary等。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40writeMessageBegin(name, type, seq)
writeMessageEnd()
writeStructBegin(name)
writeStructEnd()
writeFieldBegin(name, type, id)
writeFieldEnd()
writeFieldStop()
writeMapBegin(ktype, vtype, size)
writeMapEnd()
writeListBegin(etype, size)
writeListEnd()
writeSetBegin(etype, size)
writeSetEnd()
writeBool(bool)
writeByte(byte)
writeI16(i16)
writeI32(i32)
writeI64(i64)
writeDouble(double)
writeString(string)
name, type, seq = readMessageBegin()
readMessageEnd()
name = readStructBegin()
readStructEnd()
name, type, id = readFieldBegin()
readFieldEnd()
k, v, size = readMapBegin()
readMapEnd()
etype, size = readListBegin()
readListEnd()
etype, size = readSetBegin()
readSetEnd()
bool = readBool()
byte = readByte()
i16 = readI16()
i32 = readI32()
i64 = readI64()
double = readDouble()
string = readString()
1 | // 协议,对传输内容的封装 |
Processor
Processor封装了从输入流中读取数据并写入输出流的能力。输入和输出流由Protocol对象表示。Processor接口非常简单1
2
3interface TProcessor {
bool process(TProtocol in, TProtocol out) throws TException
}
编译器会生成特定于服务的Processor实现。Processor从线路中读取数据(使用输入协议),将处理委托给处理程序(由用户实现),并通过线路向外部写入响应(使用输出协议)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207class TProcessor {
public:
virtual ~TProcessor() = default;
virtual bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) = 0; // 子类实现
bool process(std::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) { // 协议是由server传的
return process(io, io, connectionContext); // 输入输出是同一个协议
}
std::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }
void setEventHandler(std::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
protected:
TProcessor() = default;
std::shared_ptr<TProcessorEventHandler> eventHandler_;
};
// 真正处理协议的处理器
class TMultiplexedProcessor : public TProcessor {
public:
typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
services[serviceName] = processor; // 可能有多个处理器
}
void registerDefault(const std::shared_ptr<TProcessor>& processor) {
defaultProcessor = processor;
}
TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
const std::string& name,
int32_t seqid,
const std::string& msg) const {
in->skip(::apache::thrift::protocol::T_STRUCT);
in->readMessageEnd();
in->getTransport()->readEnd();
::apache::thrift::TApplicationException
x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
"TMultiplexedProcessor: " + msg);
out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(out.get());
out->writeMessageEnd();
out->getTransport()->writeEnd();
out->getTransport()->flush();
return TException(msg);
}
bool process(std::shared_ptr<protocol::TProtocol> in, // 只处理协议
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string name;
protocol::TMessageType type;
int32_t seqid;
in->readMessageBegin(name, type, seqid); // 读出数据name, type, seqid
if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
throw protocol_error(in, out, name, seqid, "Unexpected message type");
}
boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
std::vector<std::string> tokens;
std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
if (tokens.size() == 2) {
auto it = services.find(tokens[0]); // 找到对应处理器索引
if (it != services.end()) {
std::shared_ptr<TProcessor> processor = it->second; // 找到了处理器
return processor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
out,
connectionContext); // 真正的处理过程
} else {
throw protocol_error(in, out, name, seqid,
"Unknown service: " + tokens[0] +
". Did you forget to call registerProcessor()?");
}
} else if (tokens.size() == 1) {
if (defaultProcessor) { // 使用默认的处理器
return defaultProcessor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
out,
connectionContext);
} else {
throw protocol_error(in, out, name, seqid,
"Non-multiplexed client request dropped. "
"Did you forget to call defaultProcessor()?");
}
} else {
throw protocol_error(in, out, name, seqid,
"Wrong number of tokens.");
}
}
private:
services_t services;
std::shared_ptr<TProcessor> defaultProcessor;
};
// 目前使用的这个
class TDispatchProcessor : public TProcessor {
public:
bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string fname;
protocol::TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid); // 读取fname, mtype, seqid
if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
GlobalOutput.printf("received invalid message type %d from client", mtype);
return false;
}
return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
}
protected:
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,
apache::thrift::protocol::TProtocol* out,
const std::string& fname,
int32_t seqid,
void* callContext) = 0; // 还要由具体的服务子类来实现
};
// 这里是由thrift自动生成的,父类是TDispatchProcessor
class SubscriberServiceProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
::std::shared_ptr<SubscriberServiceIf> iface_;
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
private:
typedef void (SubscriberServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_; // 存到map里面,查找时间O(1)
// 每个rpc接口对应一个函数
void process_getMaxUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_getCurUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:
SubscriberServiceProcessor(::std::shared_ptr<SubscriberServiceIf> iface) :
iface_(iface) {
processMap_["getMaxUpSpeed"] = &SubscriberServiceProcessor::process_getMaxUpSpeed; // 存到map里面,查找时间O(1)
processMap_["getCurUpSpeed"] = &SubscriberServiceProcessor::process_getCurUpSpeed;
}
virtual ~SubscriberServiceProcessor() {}
};
bool SubscriberServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) {
ProcessMap::iterator pfn;
pfn = processMap_.find(fname);
if (pfn == processMap_.end()) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'");
oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return true;
}
(this->*(pfn->second))(seqid, iprot, oprot, callContext); // 找到了函数,直接调用对应的接口
return true;
}
// processor也是采用工厂模式生成对象
class TProcessorFactory {
public:
virtual ~TProcessorFactory() = default;
virtual std::shared_ptr<TProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; // 由具体的工厂来实现
};
// 这个具体的工厂也是thrift自动生成的
class SubscriberServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:
SubscriberServiceProcessorFactory(const ::std::shared_ptr< SubscriberServiceIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
::std::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);
protected:
::std::shared_ptr< SubscriberServiceIfFactory > handlerFactory_;
};
::std::shared_ptr< ::apache::thrift::TProcessor > SubscriberServiceProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) {
::apache::thrift::ReleaseHandler< SubscriberServiceIfFactory > cleanup(handlerFactory_);
::std::shared_ptr< SubscriberServiceIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
::std::shared_ptr< ::apache::thrift::TProcessor > processor(new SubscriberServiceProcessor(handler)); // 和上面相呼应了
return processor;
}
Server
最上层就是rpc服务器了,目前常用TNonblockingServer。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563class TServer : public concurrency::Runnable {
public:
~TServer() override = default;
virtual void serve() = 0; // 子类需要实现serve
virtual void stop() {}
// Allows running the server as a Runnable thread
void run() override { serve(); } // server其实也是个线程,需要实现run函数
std::shared_ptr<TProcessorFactory> getProcessorFactory() { return processorFactory_; }
std::shared_ptr<TServerTransport> getServerTransport() { return serverTransport_; }
std::shared_ptr<TTransportFactory> getInputTransportFactory() { return inputTransportFactory_; }
std::shared_ptr<TTransportFactory> getOutputTransportFactory() {
return outputTransportFactory_;
}
std::shared_ptr<TProtocolFactory> getInputProtocolFactory() { return inputProtocolFactory_; }
std::shared_ptr<TProtocolFactory> getOutputProtocolFactory() { return outputProtocolFactory_; }
std::shared_ptr<TServerEventHandler> getEventHandler() { return eventHandler_; }
protected:
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory)
: processorFactory_(processorFactory) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessor>& processor)
: processorFactory_(new TSingletonProcessorFactory(processor)) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(processorFactory), serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
std::shared_ptr<TProcessor> getProcessor(std::shared_ptr<TProtocol> inputProtocol,
std::shared_ptr<TProtocol> outputProtocol,
std::shared_ptr<TTransport> transport) {
TConnectionInfo connInfo;
connInfo.input = inputProtocol;
connInfo.output = outputProtocol;
connInfo.transport = transport;
return processorFactory_->getProcessor(connInfo);
}
// Class variables
std::shared_ptr<TProcessorFactory> processorFactory_;
std::shared_ptr<TServerTransport> serverTransport_;
std::shared_ptr<TTransportFactory> inputTransportFactory_;
std::shared_ptr<TTransportFactory> outputTransportFactory_;
std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
std::shared_ptr<TServerEventHandler> eventHandler_;
public:
void setInputTransportFactory(std::shared_ptr<TTransportFactory> inputTransportFactory) {
inputTransportFactory_ = inputTransportFactory;
}
void setOutputTransportFactory(std::shared_ptr<TTransportFactory> outputTransportFactory) {
outputTransportFactory_ = outputTransportFactory;
}
void setInputProtocolFactory(std::shared_ptr<TProtocolFactory> inputProtocolFactory) {
inputProtocolFactory_ = inputProtocolFactory;
}
void setOutputProtocolFactory(std::shared_ptr<TProtocolFactory> outputProtocolFactory) {
outputProtocolFactory_ = outputProtocolFactory;
}
void setServerEventHandler(std::shared_ptr<TServerEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
};
// 子类实现了serve
void TNonblockingServer::serve() {
if (ioThreads_.empty())
registerEvents(NULL);
ioThreads_[0]->run();
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}
// 初始化过程
void TNonblockingServer::registerEvents(event_base* user_event_base) {
userEventBase_ = user_event_base;
// init listen socket
if (serverSocket_ == THRIFT_INVALID_SOCKET)
createAndListenOnSocket();
// set up the IO threads
assert(ioThreads_.empty());
if (!numIOThreads_) {
numIOThreads_ = DEFAULT_IO_THREADS;
}
// User-provided event-base doesn't works for multi-threaded servers
assert(numIOThreads_ == 1 || !userEventBase_);
for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
ioThreads_.push_back(thread);
}
// Notify handler of the preServe event
if (eventHandler_) {
eventHandler_->preServe();
}
// Start all of our helper IO threads. Note that the threads run forever,
// only terminating if stop() is called.
assert(ioThreads_.size() == numIOThreads_);
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !USE_BOOST_THREAD && !USE_STD_THREAD
PlatformThreadFactory::OTHER, // scheduler
PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
#endif
false // detached
));
assert(ioThreadFactory_.get());
// intentionally starting at thread 1, not 0
for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
ioThreads_[i]->setThread(thread);
thread->start();
}
}
// Register the events for the primary (listener) IO thread
ioThreads_[0]->registerEvents();
}
// 由连接就封装成任务
class TNonblockingServer::TConnection::Task : public Runnable {
public:
Task(stdcxx::shared_ptr<TProcessor> processor,
stdcxx::shared_ptr<TProtocol> input,
stdcxx::shared_ptr<TProtocol> output,
TConnection* connection)
: processor_(processor),
input_(input),
output_(output),
connection_(connection),
serverEventHandler_(connection_->getServerEventHandler()),
connectionContext_(connection_->getConnectionContext()) {}
void run() {
try {
for (;;) {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
if (!processor_->process(input_, output_, connectionContext_) // 具体的任务调用process执行
|| !input_->getTransport()->peek()) {
break;
}
}
} catch (const TTransportException& ttx) {
GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const std::bad_alloc&) {
GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(),
x.what());
} catch (...) {
GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) {
GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
connection_->server_->decrementActiveProcessors();
connection_->close();
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
TConnection* getTConnection() { return connection_; }
private:
stdcxx::shared_ptr<TProcessor> processor_;
stdcxx::shared_ptr<TProtocol> input_;
stdcxx::shared_ptr<TProtocol> output_;
TConnection* connection_;
stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
void* connectionContext_;
};
// 服务器监听回调
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
(void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
// Going to accept a new client socket
stdcxx::shared_ptr<TSocket> clientSocket;
clientSocket = serverTransport_->accept();
if (clientSocket) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
clientSocket->close();
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
clientSocket->close();
return;
}
}
}
// Create a new TConnection for this client socket.
TConnection* clientConnection = createConnection(clientSocket); // 连接状态机处理
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
clientSocket->close();
return;
}
if (clientConnection->getIOThreadNumber() == 0) {
clientConnection->transition(); // 单个io,直接状态处理
} else {
if (!clientConnection->notifyIOThread()) { // 多个io,读写分离
GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
clientConnection->close();
}
}
}
}
// 连接状态机实现
void TNonblockingServer::TConnection::transition() {
// ensure this connection is active right now
assert(ioThread_);
assert(server_);
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
if (server_->getHeaderTransport()) {
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
} else {
// We saved room for the framing size in case header transport needed it,
// but just skip it for the non-header case
inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
outputTransport_->resetBuffer();
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
}
server_->incrementActiveProcessors();
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>( // 封装成task
new Task(processor_, inputProtocol_, outputProtocol_, this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();
try {
server_->addTask(task);
} catch (IllegalStateException& ise) {
// The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
server_->decrementActiveProcessors();
close();
} catch (TimedOutException& to) {
GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
server_->decrementActiveProcessors();
close();
}
return;
} else {
try {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
} catch (const TTransportException& ttx) {
GlobalOutput.printf(
"TNonblockingServer transport error in "
"process(): %s",
ttx.what());
server_->decrementActiveProcessors();
close();
return;
} catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
typeid(x).name(),
x.what());
server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
server_->decrementActiveProcessors();
close();
return;
}
}
// Intentionally fall through here, the call to process has written into
// the writeBuffer_
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
// Put the frame size into the write buffer
int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4);
// Socket into write mode
appState_ = APP_SEND_RESULT;
setWrite();
// Try to work the socket immediately
// workSocket();
return;
}
// In this case, the request was oneway and we should fall through
// right back into the read frame header state
goto LABEL_APP_INIT;
case APP_SEND_RESULT:
// it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
// N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
writeBufferSize_ = 0;
// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
readBufferPos_ = 0;
// Register read event
setRead();
// Try to work the socket right away
// workSocket();
return;
case APP_READ_FRAME_SIZE:
readWant_ += 4;
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2;
}
uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == NULL) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}
readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
// Move into read request state
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
// Work the socket right away
workSocket();
return;
case APP_CLOSE_CONNECTION:
server_->decrementActiveProcessors();
close();
return;
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
}
}
// 使用过程
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); // 线程池
std::shared_ptr<TNonblockingServerSocket> socket = std::make_shared<TNonblockingServerSocket>(path); // unix domain socket,transport
std::shared_ptr<TNonblockingServer> server = std::make_shared<TNonblockingServer>(processorFactory, // processor
std::make_shared<TBinaryProtocolFactory>(), // 协议
socket,
threadManager);
server->serve();
// 单独一个线程启动,因为会阻塞
shared_ptr<Runner> runner(new Runner(rss, processorFactory, name, port, workerCnt));
shared_ptr<ThreadFactory> threadFactory(
new ThreadFactory(true));
shared_ptr<apache::thrift::concurrency::Thread> thread = threadFactory->newThread(runner);
thread->start();
runner->readyBarrier();