+——————————————-+
| Server |
| (single-threaded, event-driven etc) |
+——————————————-+
| Processor |
| (compiler generated) |
+——————————————-+
| Protocol |
| (JSON, compact etc) |
+——————————————-+
| Transport |
| (raw TCP, HTTP etc) |
+——————————————-+
Transport
Transport传输层为从网络读取/向网络写入提供了一个简单的抽象。这使 Thrift 能够将底层传输与系统的其余部分分离(例如,序列化/反序列化)
Transport接口:
- open
- close
- read
- write
- flush
除了上面的 Transport 接口之外,Thrift 还使用了一个 ServerTransport 接口,用于接受或创建原始传输对象。顾名思义,ServerTransport 主要用于服务器端,为传入连接创建新的传输对象
- open
- listen
- accept
- close
1 | // TTransport 类是所有传输类的基类,它定义了一系列虚函数,这些函数将在子类中实现。它包含基本的读写操作接口,但并未提供实际实现,而是抛出异常。 |
Thrift 的 Transport 层通过基类 TTransport 定义了基本接口和默认行为。具体的 Transport 子类如 TSocket 和 TFramedTransport 通过继承和组合的方式,实现了不同的传输机制。TVirtualTransport 模板类简化了子类的实现,使得子类只需实现关键的读写操作。Transport 工厂类则提供了创建 Transport 实例的灵活机制。
Protocol
Protocol协议抽象定义一种将内存中的数据结构映射到线路格式的机制。换句话说,协议指定了数据类型如何使用底层传输来编码/解码自身。因此,协议实现管理编码方案并负责(反)序列化过程。一些示例协议包括JSON、XML、纯文本、compact binary等。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40writeMessageBegin(name, type, seq)
writeMessageEnd()
writeStructBegin(name)
writeStructEnd()
writeFieldBegin(name, type, id)
writeFieldEnd()
writeFieldStop()
writeMapBegin(ktype, vtype, size)
writeMapEnd()
writeListBegin(etype, size)
writeListEnd()
writeSetBegin(etype, size)
writeSetEnd()
writeBool(bool)
writeByte(byte)
writeI16(i16)
writeI32(i32)
writeI64(i64)
writeDouble(double)
writeString(string)
name, type, seq = readMessageBegin()
readMessageEnd()
name = readStructBegin()
readStructEnd()
name, type, id = readFieldBegin()
readFieldEnd()
k, v, size = readMapBegin()
readMapEnd()
etype, size = readListBegin()
readListEnd()
etype, size = readSetBegin()
readSetEnd()
bool = readBool()
byte = readByte()
i16 = readI16()
i32 = readI32()
i64 = readI64()
double = readDouble()
string = readString()
1 | // 协议,对传输内容的封装 |
Processor
Processor封装了从输入流中读取数据并写入输出流的能力。输入和输出流由Protocol对象表示。Processor接口非常简单1
2
3interface TProcessor {
bool process(TProtocol in, TProtocol out) throws TException
}
编译器会生成特定于服务的Processor实现。Processor从线路中读取数据(使用输入协议),将处理委托给处理程序(由用户实现),并通过线路向外部写入响应(使用输出协议)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207class TProcessor {
public:
virtual ~TProcessor() = default;
virtual bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) = 0; // 子类实现
bool process(std::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) { // 协议是由server传的
return process(io, io, connectionContext); // 输入输出是同一个协议
}
std::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }
void setEventHandler(std::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
protected:
TProcessor() = default;
std::shared_ptr<TProcessorEventHandler> eventHandler_;
};
// 真正处理协议的处理器
class TMultiplexedProcessor : public TProcessor {
public:
typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
services[serviceName] = processor; // 可能有多个处理器
}
void registerDefault(const std::shared_ptr<TProcessor>& processor) {
defaultProcessor = processor;
}
TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
const std::string& name,
int32_t seqid,
const std::string& msg) const {
in->skip(::apache::thrift::protocol::T_STRUCT);
in->readMessageEnd();
in->getTransport()->readEnd();
::apache::thrift::TApplicationException
x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
"TMultiplexedProcessor: " + msg);
out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(out.get());
out->writeMessageEnd();
out->getTransport()->writeEnd();
out->getTransport()->flush();
return TException(msg);
}
bool process(std::shared_ptr<protocol::TProtocol> in, // 只处理协议
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string name;
protocol::TMessageType type;
int32_t seqid;
in->readMessageBegin(name, type, seqid); // 读出数据name, type, seqid
if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
throw protocol_error(in, out, name, seqid, "Unexpected message type");
}
boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
std::vector<std::string> tokens;
std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
if (tokens.size() == 2) {
auto it = services.find(tokens[0]); // 找到对应处理器索引
if (it != services.end()) {
std::shared_ptr<TProcessor> processor = it->second; // 找到了处理器
return processor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
out,
connectionContext); // 真正的处理过程
} else {
throw protocol_error(in, out, name, seqid,
"Unknown service: " + tokens[0] +
". Did you forget to call registerProcessor()?");
}
} else if (tokens.size() == 1) {
if (defaultProcessor) { // 使用默认的处理器
return defaultProcessor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
out,
connectionContext);
} else {
throw protocol_error(in, out, name, seqid,
"Non-multiplexed client request dropped. "
"Did you forget to call defaultProcessor()?");
}
} else {
throw protocol_error(in, out, name, seqid,
"Wrong number of tokens.");
}
}
private:
services_t services;
std::shared_ptr<TProcessor> defaultProcessor;
};
// 目前使用的这个
class TDispatchProcessor : public TProcessor {
public:
bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string fname;
protocol::TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid); // 读取fname, mtype, seqid
if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
GlobalOutput.printf("received invalid message type %d from client", mtype);
return false;
}
return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
}
protected:
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,
apache::thrift::protocol::TProtocol* out,
const std::string& fname,
int32_t seqid,
void* callContext) = 0; // 还要由具体的服务子类来实现
};
// 这里是由thrift自动生成的,父类是TDispatchProcessor
class SubscriberServiceProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
::std::shared_ptr<SubscriberServiceIf> iface_;
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
private:
typedef void (SubscriberServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_; // 存到map里面,查找时间O(1)
// 每个rpc接口对应一个函数
void process_getMaxUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_getCurUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:
SubscriberServiceProcessor(::std::shared_ptr<SubscriberServiceIf> iface) :
iface_(iface) {
processMap_["getMaxUpSpeed"] = &SubscriberServiceProcessor::process_getMaxUpSpeed; // 存到map里面,查找时间O(1)
processMap_["getCurUpSpeed"] = &SubscriberServiceProcessor::process_getCurUpSpeed;
}
virtual ~SubscriberServiceProcessor() {}
};
bool SubscriberServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) {
ProcessMap::iterator pfn;
pfn = processMap_.find(fname);
if (pfn == processMap_.end()) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'");
oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return true;
}
(this->*(pfn->second))(seqid, iprot, oprot, callContext); // 找到了函数,直接调用对应的接口
return true;
}
// processor也是采用工厂模式生成对象
class TProcessorFactory {
public:
virtual ~TProcessorFactory() = default;
virtual std::shared_ptr<TProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; // 由具体的工厂来实现
};
// 这个具体的工厂也是thrift自动生成的
class SubscriberServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:
SubscriberServiceProcessorFactory(const ::std::shared_ptr< SubscriberServiceIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
::std::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);
protected:
::std::shared_ptr< SubscriberServiceIfFactory > handlerFactory_;
};
::std::shared_ptr< ::apache::thrift::TProcessor > SubscriberServiceProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) {
::apache::thrift::ReleaseHandler< SubscriberServiceIfFactory > cleanup(handlerFactory_);
::std::shared_ptr< SubscriberServiceIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
::std::shared_ptr< ::apache::thrift::TProcessor > processor(new SubscriberServiceProcessor(handler)); // 和上面相呼应了
return processor;
}
Server
最上层就是rpc服务器了,目前常用TNonblockingServer。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894class TServer : public concurrency::Runnable {
public:
~TServer() override = default;
virtual void serve() = 0; // 子类需要实现serve
virtual void stop() {}
// Allows running the server as a Runnable thread
void run() override { serve(); } // server其实也是个线程,需要实现run函数
std::shared_ptr<TProcessorFactory> getProcessorFactory() { return processorFactory_; }
std::shared_ptr<TServerTransport> getServerTransport() { return serverTransport_; }
std::shared_ptr<TTransportFactory> getInputTransportFactory() { return inputTransportFactory_; }
std::shared_ptr<TTransportFactory> getOutputTransportFactory() {
return outputTransportFactory_;
}
std::shared_ptr<TProtocolFactory> getInputProtocolFactory() { return inputProtocolFactory_; }
std::shared_ptr<TProtocolFactory> getOutputProtocolFactory() { return outputProtocolFactory_; }
std::shared_ptr<TServerEventHandler> getEventHandler() { return eventHandler_; }
protected:
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory)
: processorFactory_(processorFactory) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessor>& processor)
: processorFactory_(new TSingletonProcessorFactory(processor)) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(processorFactory), serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}
std::shared_ptr<TProcessor> getProcessor(std::shared_ptr<TProtocol> inputProtocol,
std::shared_ptr<TProtocol> outputProtocol,
std::shared_ptr<TTransport> transport) {
TConnectionInfo connInfo;
connInfo.input = inputProtocol;
connInfo.output = outputProtocol;
connInfo.transport = transport;
return processorFactory_->getProcessor(connInfo);
}
// Class variables
std::shared_ptr<TProcessorFactory> processorFactory_;
std::shared_ptr<TServerTransport> serverTransport_;
std::shared_ptr<TTransportFactory> inputTransportFactory_;
std::shared_ptr<TTransportFactory> outputTransportFactory_;
std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
std::shared_ptr<TServerEventHandler> eventHandler_;
public:
void setInputTransportFactory(std::shared_ptr<TTransportFactory> inputTransportFactory) {
inputTransportFactory_ = inputTransportFactory;
}
void setOutputTransportFactory(std::shared_ptr<TTransportFactory> outputTransportFactory) {
outputTransportFactory_ = outputTransportFactory;
}
void setInputProtocolFactory(std::shared_ptr<TProtocolFactory> inputProtocolFactory) {
inputProtocolFactory_ = inputProtocolFactory;
}
void setOutputProtocolFactory(std::shared_ptr<TProtocolFactory> outputProtocolFactory) {
outputProtocolFactory_ = outputProtocolFactory;
}
void setServerEventHandler(std::shared_ptr<TServerEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
};
// 子类实现了serve
void TNonblockingServer::serve() {
if (ioThreads_.empty())
registerEvents(NULL);
ioThreads_[0]->run();
for (uint32_t i = 0; i < ioThreads_.size(); ++i) { // 多个io线程
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}
// 服务器初始化过程
void TNonblockingServer::registerEvents(event_base* user_event_base) { // 使用了libevent
userEventBase_ = user_event_base;
// init listen socket
if (serverSocket_ == THRIFT_INVALID_SOCKET)
createAndListenOnSocket();
// set up the IO threads
assert(ioThreads_.empty());
if (!numIOThreads_) {
numIOThreads_ = DEFAULT_IO_THREADS; // static const int DEFAULT_IO_THREADS = 1;
}
// User-provided event-base doesn't works for multi-threaded servers
assert(numIOThreads_ == 1 || !userEventBase_);
for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); // 传入了listenFd
ioThreads_.push_back(thread); // 多个io线程
}
// Notify handler of the preServe event
if (eventHandler_) {
eventHandler_->preServe();
}
// Start all of our helper IO threads. Note that the threads run forever,
// only terminating if stop() is called.
assert(ioThreads_.size() == numIOThreads_);
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !USE_BOOST_THREAD && !USE_STD_THREAD
PlatformThreadFactory::OTHER, // scheduler
PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
#endif
false // detached
));
assert(ioThreadFactory_.get());
// intentionally starting at thread 1, not 0
for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
ioThreads_[i]->setThread(thread);
thread->start(); // 启动线程,会调用TNonblockingIOThread::run()
}
}
// Register the events for the primary (listener) IO thread
ioThreads_[0]->registerEvents(); // 至少有一个监听的io线程
}
void TNonblockingIOThread::run() {
if (eventBase_ == NULL) {
registerEvents(); // 线程运行时会注册一次,每个线程单独有个reactor
}
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}
if (eventBase_ != NULL)
{
GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
// Run libevent engine, never returns, invokes calls to eventHandler
event_base_loop(eventBase_, 0); // 阻塞运行
if (useHighPriority_) {
setCurrentThreadHighPriority(false);
}
// cleans up our registered events
cleanupEvents();
}
GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
/**
* Register the core libevent events onto the proper base.
*/
void TNonblockingIOThread::registerEvents() { // 线程级别注册
threadId_ = Thread::get_current();
assert(eventBase_ == 0);
eventBase_ = getServer()->getUserEventBase();
if (eventBase_ == NULL) {
eventBase_ = event_base_new();
ownEventBase_ = true;
}
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
event_get_version(),
event_base_get_method(eventBase_));
}
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_); // 注册accept事件
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, 0)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on server listen event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}
createNotificationPipe(); // 创建pipe
// Create an event to be notified when a task finishes
event_set(¬ificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TNonblockingIOThread::notifyHandler, // 管道收到通知时回调notifyHandler
this);
// Attach to the base
event_base_set(eventBase_, ¬ificationEvent_);
// Add the event and start up the server
if (-1 == event_add(¬ificationEvent_, 0)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on task-done notification event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}
// 服务器监听回调
// static void listenHandler(evutil_socket_t fd, short which, void* v) {
// ((TNonblockingServer*)v)->handleEvent(fd, which); // 监听回调最终函数
// }
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { // 监听的io线程处理
(void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
// Going to accept a new client socket
stdcxx::shared_ptr<TSocket> clientSocket;
clientSocket = serverTransport_->accept();
if (clientSocket) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
clientSocket->close();
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
clientSocket->close();
return;
}
}
}
// Create a new TConnection for this client socket.
TConnection* clientConnection = createConnection(clientSocket); // 选一个线程来管理连接,后续进行状态机处理
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
clientSocket->close();
return;
}
if (clientConnection->getIOThreadNumber() == 0) {
clientConnection->transition(); // 单个io,直接状态处理
} else {
if (!clientConnection->notifyIOThread()) { // 多个io,读写分离,通过notificationPipeFDs_通知,notifyHandler回调
GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
clientConnection->close();
}
}
}
}
/**
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(stdcxx::shared_ptr<TSocket> socket) { // server级别函数
// Check the stack
Guard g(connMutex_);
// pick an IO thread to handle this connection -- currently round robin
assert(nextIOThread_ < ioThreads_.size());
int selectedThreadIdx = nextIOThread_;
nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size()); // 轮询选择io线程
TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
// Check the connection stack to see if we can re-use
TConnection* result = NULL;
if (connectionStack_.empty()) {
result = new TConnection(socket, ioThread);
++numTConnections_;
} else {
result = connectionStack_.top();
connectionStack_.pop();
result->setSocket(socket);
result->init(ioThread);
}
activeConnections_.push_back(result); // 记录活跃的连接
return result;
}
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { // 其它io线程接收connection信息
TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;
while (true) {
TNonblockingServer::TConnection* connection = 0;
const int kSize = sizeof(connection);
long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0); // 接收connection信息
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
ioThread->breakLoop(false);
return;
}
connection->transition(); // 开始状态机处理
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
&& THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
ioThread->breakLoop(true);
return;
}
// exit the loop
break;
}
}
}
/**
* Five states for the nonblocking server:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT, // 初始化为appState_ = APP_INIT;
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { // TConnection回调函数
assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
((TConnection*)v)->workSocket();
}
// 连接状态机实现
void TNonblockingServer::TConnection::transition() {
// ensure this connection is active right now
assert(ioThread_);
assert(server_);
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
if (server_->getHeaderTransport()) {
inputTransport_->resetBuffer(readBuffer_, readBufferPos_); // stdcxx::shared_ptr<TMemoryBuffer> inputTransport_;
outputTransport_->resetBuffer();
} else {
// We saved room for the framing size in case header transport needed it,
// but just skip it for the non-header case
inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4); // 先发到TMemoryBuffer里,可生成inputProtocol_
outputTransport_->resetBuffer();
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
}
server_->incrementActiveProcessors(); // server级别
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>( // 封装成task
new Task(processor_, inputProtocol_, outputProtocol_, this)); // inputTransport_生成的inputProtocol_
// get input/transports
// factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
// factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
// if (server_->getHeaderTransport()) {
// inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
// factoryOutputTransport_);
// outputProtocol_ = inputProtocol_;
// } else {
// inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
// outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
// }
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();
try {
server_->addTask(task); // threadManager_->add(task, 0LL, taskExpireTime_); 交给线程管理器,tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); 提交到多线程的队列里
} catch (IllegalStateException& ise) {
// The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
server_->decrementActiveProcessors();
close();
} catch (TimedOutException& to) {
GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
server_->decrementActiveProcessors();
close();
}
return;
} else {
try {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
processor_->process(inputProtocol_, outputProtocol_, connectionContext_); // 如果没有多线程,直接处理器处理
} catch (const TTransportException& ttx) {
GlobalOutput.printf(
"TNonblockingServer transport error in "
"process(): %s",
ttx.what());
server_->decrementActiveProcessors();
close();
return;
} catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
typeid(x).name(),
x.what());
server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
server_->decrementActiveProcessors();
close();
return;
}
}
// Intentionally fall through here, the call to process has written into
// the writeBuffer_
case APP_WAIT_TASK: // 线程处理完了,notifyIOThread跑到这里
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); // 需要发多少数据
// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND; // 开始发送
// Put the frame size into the write buffer
int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4); // 大小写到头部
// Socket into write mode
appState_ = APP_SEND_RESULT; // 发送结果
setWrite(); // 触发写回调
// Try to work the socket immediately
// workSocket();
return;
}
// In this case, the request was oneway and we should fall through
// right back into the read frame header state
goto LABEL_APP_INIT;
case APP_SEND_RESULT:
// it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
// N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
writeBufferSize_ = 0;
// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING; // 先接收
appState_ = APP_READ_FRAME_SIZE; // 换到下一个状态
readBufferPos_ = 0;
// Register read event
setRead(); // 读回调
// Try to work the socket right away
// workSocket();
return;
case APP_READ_FRAME_SIZE:
readWant_ += 4;
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) { // 分配读取缓冲区的空间
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2; // 分配的空间为2倍大小
}
uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == NULL) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}
readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4); // 第一个位置保存大小
// Move into read request state
socketState_ = SOCKET_RECV; // 开始接收数据
appState_ = APP_READ_REQUEST; // 开始读请求
// Work the socket right away
workSocket();
return;
case APP_CLOSE_CONNECTION:
server_->decrementActiveProcessors();
close();
return;
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
}
}
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND }; // 初始化 socketState_ = SOCKET_RECV_FRAMING;
void TNonblockingServer::TConnection::workSocket() {
int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;
switch (socketState_) {
case SOCKET_RECV_FRAMING: // 初始化状态
union {
uint8_t buf[sizeof(uint32_t)];
uint32_t size;
} framing; // FramedTransport协议
// if we've already received some bytes we kept them here
framing.size = readWant_;
// determine size of this frame
try {
// Read from the socket
fetch = tSocket_->read(&framing.buf[readBufferPos_], // stdcxx::shared_ptr<TSocket> tSocket_;
uint32_t(sizeof(framing.size) - readBufferPos_));
if (fetch == 0) {
// Whenever we get here it means a remote disconnect
close();
return;
}
readBufferPos_ += fetch;
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
return;
}
}
if (readBufferPos_ < sizeof(framing.size)) { // 至少要读framing字节
// more needed before frame size is known -- save what we have so far
readWant_ = framing.size;
return;
}
readWant_ = ntohl(framing.size); // 先得知道要读多少数据
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
GlobalOutput.printf(
"TNonblockingServer: frame size too large "
"(%" PRIu32 " > %" PRIu64
") from client %s. "
"Remote side not using TFramedTransport?",
readWant_,
(uint64_t)server_->getMaxFrameSize(),
tSocket_->getSocketInfo().c_str());
close();
return;
}
// size known; now get the rest of the frame
transition();
return;
case SOCKET_RECV:
// It is an error to be in this state if we already have all the data
assert(readBufferPos_ < readWant_);
try {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); // 读数据
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
}
return;
}
if (got > 0) {
// Move along in the buffer
readBufferPos_ += got; // 应用层缓冲区读了多少数据
// Check that we did not overdo it
assert(readBufferPos_ <= readWant_);
// We are done reading, move onto the next state
if (readBufferPos_ == readWant_) {
transition(); // 读到了完整的请求
}
return;
}
// Whenever we get down here it means a remote disconnect
close();
return;
case SOCKET_SEND:
// Should never have position past size
assert(writeBufferPos_ <= writeBufferSize_);
// If there is no data to send, then let us move on
if (writeBufferPos_ == writeBufferSize_) {
GlobalOutput("WARNING: Send state with no data to send");
transition();
return;
}
try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
} catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
}
writeBufferPos_ += sent;
// Did we overdo it?
assert(writeBufferPos_ <= writeBufferSize_);
// We are done!
if (writeBufferPos_ == writeBufferSize_) {
transition();
}
return;
default:
GlobalOutput.printf("Unexpected Socket State %d", socketState_);
assert(0);
}
}
// 由连接就封装成任务
class TNonblockingServer::TConnection::Task : public Runnable {
public:
Task(stdcxx::shared_ptr<TProcessor> processor,
stdcxx::shared_ptr<TProtocol> input,
stdcxx::shared_ptr<TProtocol> output,
TConnection* connection)
: processor_(processor),
input_(input),
output_(output),
connection_(connection),
serverEventHandler_(connection_->getServerEventHandler()),
connectionContext_(connection_->getConnectionContext()) {}
void run() {
try {
for (;;) {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
if (!processor_->process(input_, output_, connectionContext_) // 具体的任务调用process执行
|| !input_->getTransport()->peek()) {
break;
}
}
} catch (const TTransportException& ttx) {
GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const std::bad_alloc&) {
GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(),
x.what());
} catch (...) {
GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) { // 任务处理完了,通过pipe异步触发跑transition()
GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
connection_->server_->decrementActiveProcessors();
connection_->close();
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
TConnection* getTConnection() { return connection_; }
private:
stdcxx::shared_ptr<TProcessor> processor_;
stdcxx::shared_ptr<TProtocol> input_;
stdcxx::shared_ptr<TProtocol> output_;
TConnection* connection_;
stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
void* connectionContext_;
};
// 感悟:可通过pipe在监听线程和其它io线程间传递新连接,触发连接的状态处理,连接读取到请求数据后,丢到线程池处理,线程处理完成后,通过pipe异步触发状态机发送数据,所以pipe可以在多线程间通信,也可以触发异步流程,另外可以将fd的io收发绑定到指定的线程
// 主服务器:使用过程
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); // 线程池
std::shared_ptr<TNonblockingServerSocket> socket = std::make_shared<TNonblockingServerSocket>(path); // unix domain socket,transport
std::shared_ptr<TNonblockingServer> server = std::make_shared<TNonblockingServer>(processorFactory, // processor
std::make_shared<TBinaryProtocolFactory>(), // 协议
socket,
threadManager);
server->serve();
// 客户端监控线程:单独一个线程启动,因为会阻塞
shared_ptr<Runner> runner(new Runner(rss, processorFactory, name, port, workerCnt));
shared_ptr<ThreadFactory> threadFactory(
new ThreadFactory(true));
shared_ptr<apache::thrift::concurrency::Thread> thread = threadFactory->newThread(runner);
thread->start();
runner->readyBarrier();