thrift源码分析

+——————————————-+
| Server |
| (single-threaded, event-driven etc) |
+——————————————-+
| Processor |
| (compiler generated) |
+——————————————-+
| Protocol |
| (JSON, compact etc) |
+——————————————-+
| Transport |
| (raw TCP, HTTP etc) |
+——————————————-+

Transport

Transport传输层为从网络读取/向网络写入提供了一个简单的抽象。这使 Thrift 能够将底层传输与系统的其余部分分离(例如,序列化/反序列化)
Transport接口:

  • open
  • close
  • read
  • write
  • flush

除了上面的 Transport 接口之外,Thrift 还使用了一个 ServerTransport 接口,用于接受或创建原始传输对象。顾名思义,ServerTransport 主要用于服务器端,为传入连接创建新的传输对象

  • open
  • listen
  • accept
  • close
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
// TTransport 类是所有传输类的基类,它定义了一系列虚函数,这些函数将在子类中实现。它包含基本的读写操作接口,但并未提供实际实现,而是抛出异常。
class TTransport {
public:
virtual ~TTransport() = default;

virtual bool isOpen() const { return false; }

virtual bool peek() { return isOpen(); }

virtual void open() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
}

virtual void close() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
}

uint32_t read(uint8_t* buf, uint32_t len) { // !这个不是虚函数
T_VIRTUAL_CALL();
return read_virt(buf, len);
}
virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
}

uint32_t readAll(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return readAll_virt(buf, len);
}
virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
return apache::thrift::transport::readAll(*this, buf, len);
}

virtual uint32_t readEnd() {
// default behaviour is to do nothing
return 0;
}

void write(const uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
write_virt(buf, len);
}
virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
}

virtual uint32_t writeEnd() {
// default behaviour is to do nothing
return 0;
}

virtual void flush() {
// default behaviour is to do nothing
}

const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
T_VIRTUAL_CALL();
return borrow_virt(buf, len);
}
virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { return nullptr; }

void consume(uint32_t len) {
T_VIRTUAL_CALL();
consume_virt(len);
}
virtual void consume_virt(uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
}

virtual const std::string getOrigin() const { return "Unknown"; }

protected:
TTransport() = default;
};

// TTransportDefaults 类继承自 TTransport,并覆盖了基类中的读写函数,使其直接调用虚函数版本。这种设计模式确保所有读写操作都最终通过虚函数调用,以便子类重载这些操作。
class TTransportDefaults : public TTransport {
public:
uint32_t read(uint8_t* buf, uint32_t len) { return this->TTransport::read_virt(buf, len); } // 全改为read_virt
uint32_t readAll(uint8_t* buf, uint32_t len) { return this->TTransport::readAll_virt(buf, len); }
void write(const uint8_t* buf, uint32_t len) { this->TTransport::write_virt(buf, len); }
const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
return this->TTransport::borrow_virt(buf, len);
}
void consume(uint32_t len) { this->TTransport::consume_virt(len); }

protected:
TTransportDefaults() = default;
};

// TVirtualTransport 类模板提供了一种灵活的方式来重载虚函数。通过静态转换,将调用委派给子类。这样,子类只需重载实际的读写操作,虚函数就会调用子类的实现。
// Transport_为子类的名字
template <class Transport_, class Super_ = TTransportDefaults>
class TVirtualTransport : public Super_ {
public:
uint32_t read_virt(uint8_t* buf, uint32_t len) override {
return static_cast<Transport_*>(this)->read(buf, len); // 调用子类的接口
}

uint32_t readAll_virt(uint8_t* buf, uint32_t len) override {
return static_cast<Transport_*>(this)->readAll(buf, len); // 调用子类的接口
}

void write_virt(const uint8_t* buf, uint32_t len) override {
static_cast<Transport_*>(this)->write(buf, len); // 调用子类的接口
}

const uint8_t* borrow_virt(uint8_t* buf, uint32_t* len) override {
return static_cast<Transport_*>(this)->borrow(buf, len); // 调用子类的接口
}

void consume_virt(uint32_t len) override {
static_cast<Transport_*>(this)->consume(len);
}
uint32_t readAll(uint8_t* buf, uint32_t len) {
auto* trans = static_cast<Transport_*>(this);
return ::apache::thrift::transport::readAll(*trans, buf, len);
}

protected:
TVirtualTransport() = default;

template <typename Arg_>
TVirtualTransport(Arg_ const& arg)
: Super_(arg) {}

template <typename Arg1_, typename Arg2_>
TVirtualTransport(Arg1_ const& a1, Arg2_ const& a2)
: Super_(a1, a2) {}
};

// TSocket 类是一个具体的传输实现,它继承自 TVirtualTransport<TSocket>,并实现了实际的网络读写操作。TSocket 类实现了打开、关闭、读写等操作,并且可以配置 socket 的各种参数。
// TSocket也是继承这里的,所以后面装饰TSocket就成了Transport
class TSocket : public TVirtualTransport<TSocket> {
public:
TSocket();
TSocket(const std::string& host, int port);
TSocket(const std::string& path);
~TSocket() override;
bool isOpen() const override;
bool peek() override;
void open() override;
void close() override;
virtual bool hasPendingDataToRead();
virtual uint32_t read(uint8_t* buf, uint32_t len);
virtual void write(const uint8_t* buf, uint32_t len);
virtual uint32_t write_partial(const uint8_t* buf, uint32_t len);

std::string getHost();
int getPort();
void setHost(std::string host);
void setPort(int port);
void setLinger(bool on, int linger);
void setNoDelay(bool noDelay);
void setConnTimeout(int ms);
void setRecvTimeout(int ms);
void setSendTimeout(int ms);
void setMaxRecvRetries(int maxRecvRetries);
void setKeepAlive(bool keepAlive);
std::string getSocketInfo() const;
std::string getPeerHost() const;
std::string getPeerAddress() const;
int getPeerPort() const;
THRIFT_SOCKET getSocketFD() { return socket_; }
void setSocketFD(THRIFT_SOCKET fd);
sockaddr* getCachedAddress(socklen_t* len) const;
static void setUseLowMinRto(bool useLowMinRto);
static bool getUseLowMinRto();
const std::string getOrigin() const override;
TSocket(THRIFT_SOCKET socket);
TSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interruptListener);
void setCachedAddress(const sockaddr* addr, socklen_t len);
protected:
void openConnection(struct addrinfo* res);
std::string host_;
int port_;
std::string path_;
THRIFT_SOCKET socket_;
mutable std::string peerHost_;
mutable std::string peerAddress_;
mutable int peerPort_;
std::shared_ptr<THRIFT_SOCKET> interruptListener_;
int connTimeout_;
int sendTimeout_;
int recvTimeout_;
bool keepAlive_;
bool lingerOn_;
int lingerVal_;
bool noDelay_;
int maxRecvRetries_;
union {
sockaddr_in ipv4;
sockaddr_in6 ipv6;
} cachedPeerAddr_;
static bool useLowMinRto_;

private:
void unix_open();
void local_open();
};

// TBufferBase 是一个抽象类,继承自 TVirtualTransport<TBufferBase>,实现了基于缓冲区的读写操作,并定义了一些纯虚函数供子类实现。
class TBufferBase : public TVirtualTransport<TBufferBase> {
public:
uint32_t read(uint8_t* buf, uint32_t len) { // 实现read接口,TVirtualTransport父类会调用
uint8_t* new_rBase = rBase_ + len;
if (TDB_LIKELY(new_rBase <= rBound_)) {
std::memcpy(buf, rBase_, len);
rBase_ = new_rBase;
return len;
}
return readSlow(buf, len); // 框架已经固定,子类只要实现readSlow就行了
}

uint32_t readAll(uint8_t* buf, uint32_t len) { // 实现readAll接口,TVirtualTransport父类会调用
uint8_t* new_rBase = rBase_ + len;
if (TDB_LIKELY(new_rBase <= rBound_)) {
std::memcpy(buf, rBase_, len);
rBase_ = new_rBase;
return len;
}
return apache::thrift::transport::readAll(*this, buf, len);
}

void write(const uint8_t* buf, uint32_t len) { // 实现write接口,TVirtualTransport父类会调用
uint8_t* new_wBase = wBase_ + len;
if (TDB_LIKELY(new_wBase <= wBound_)) {
std::memcpy(wBase_, buf, len);
wBase_ = new_wBase;
return;
}
writeSlow(buf, len); // 框架已经固定,子类只要实现writeSlow就行了
}

const uint8_t* borrow(uint8_t* buf, uint32_t* len) { // 实现borrow接口,TVirtualTransport父类会调用
if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
*len = static_cast<uint32_t>(rBound_ - rBase_);
return rBase_;
}
return borrowSlow(buf, len); // 框架已经固定,子类只要实现borrowSlow就行了
}

void consume(uint32_t len) {
if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
rBase_ += len;
} else {
throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow.");
}
}

protected:
virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; // 子类必须实现

virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;

virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;

TBufferBase() : rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {}

void setReadBuffer(uint8_t* buf, uint32_t len) {
rBase_ = buf;
rBound_ = buf + len;
}

void setWriteBuffer(uint8_t* buf, uint32_t len) {
wBase_ = buf;
wBound_ = buf + len;
}

~TBufferBase() override = default;

uint8_t* rBase_;
uint8_t* rBound_;

uint8_t* wBase_;
uint8_t* wBound_;
};

// TFramedTransport类型
// TFramedTransport 是一种特殊的 Transport,继承自 TVirtualTransport<TFramedTransport, TBufferBase>,在读写时添加帧的概念。
class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { // 从TBufferBase继承而来
public:
static const int DEFAULT_BUFFER_SIZE = 512;
static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;

TFramedTransport()
: transport_(),
rBufSize_(0),
wBufSize_(DEFAULT_BUFFER_SIZE),
rBuf_(),
wBuf_(new uint8_t[wBufSize_]),
bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) {
initPointers();
}

TFramedTransport(std::shared_ptr<TTransport> transport) // 装饰者模式,既有继承又有组合,可以装饰TSocket
: transport_(transport),
rBufSize_(0),
wBufSize_(DEFAULT_BUFFER_SIZE),
rBuf_(),
wBuf_(new uint8_t[wBufSize_]),
bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()),
maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) {
initPointers();
}

TFramedTransport(std::shared_ptr<TTransport> transport,
uint32_t sz,
uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)())
: transport_(transport),
rBufSize_(0),
wBufSize_(sz),
rBuf_(),
wBuf_(new uint8_t[wBufSize_]),
bufReclaimThresh_(bufReclaimThresh),
maxFrameSize_(DEFAULT_MAX_FRAME_SIZE) {
initPointers();
}

void open() override { transport_->open(); } // open是一样的接口,没有装饰

bool isOpen() const override { return transport_->isOpen(); }

bool peek() override { return (rBase_ < rBound_) || transport_->peek(); }

void close() override {
flush();
transport_->close();
}

uint32_t readSlow(uint8_t* buf, uint32_t len) override;

void writeSlow(const uint8_t* buf, uint32_t len) override;

void flush() override;

uint32_t readEnd() override;

uint32_t writeEnd() override;

const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;

std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }

using TBufferBase::readAll;

const std::string getOrigin() const override { return transport_->getOrigin(); }

void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }

uint32_t getMaxFrameSize() { return maxFrameSize_; }

protected:
virtual bool readFrame();

void initPointers() {
setReadBuffer(nullptr, 0);
setWriteBuffer(wBuf_.get(), wBufSize_);

int32_t pad = 0;
this->write((uint8_t*)&pad, sizeof(pad));
}

std::shared_ptr<TTransport> transport_;

uint32_t rBufSize_;
uint32_t wBufSize_;
boost::scoped_array<uint8_t> rBuf_;
boost::scoped_array<uint8_t> wBuf_;
uint32_t bufReclaimThresh_;
uint32_t maxFrameSize_;
};

// 工厂类实现
// TTransportFactory 是 Transport 的工厂类,提供创建 Transport 实例的方法。
class TTransportFactory {
public:
TTransportFactory() = default;

virtual ~TTransportFactory() = default;

virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
return trans;
}
};

class TFramedTransportFactory : public TTransportFactory {
public:
TFramedTransportFactory() = default;

~TFramedTransportFactory() override = default;

std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
return std::shared_ptr<TTransport>(new TFramedTransport(trans)); // 装饰一个新的类
}
};

Thrift 的 Transport 层通过基类 TTransport 定义了基本接口和默认行为。具体的 Transport 子类如 TSocket 和 TFramedTransport 通过继承和组合的方式,实现了不同的传输机制。TVirtualTransport 模板类简化了子类的实现,使得子类只需实现关键的读写操作。Transport 工厂类则提供了创建 Transport 实例的灵活机制。

Protocol

Protocol协议抽象定义一种将内存中的数据结构映射到线路格式的机制。换句话说,协议指定了数据类型如何使用底层传输来编码/解码自身。因此,协议实现管理编码方案并负责(反)序列化过程。一些示例协议包括JSON、XML、纯文本、compact binary等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
writeMessageBegin(name, type, seq)
writeMessageEnd()
writeStructBegin(name)
writeStructEnd()
writeFieldBegin(name, type, id)
writeFieldEnd()
writeFieldStop()
writeMapBegin(ktype, vtype, size)
writeMapEnd()
writeListBegin(etype, size)
writeListEnd()
writeSetBegin(etype, size)
writeSetEnd()
writeBool(bool)
writeByte(byte)
writeI16(i16)
writeI32(i32)
writeI64(i64)
writeDouble(double)
writeString(string)

name, type, seq = readMessageBegin()
readMessageEnd()
name = readStructBegin()
readStructEnd()
name, type, id = readFieldBegin()
readFieldEnd()
k, v, size = readMapBegin()
readMapEnd()
etype, size = readListBegin()
readListEnd()
etype, size = readSetBegin()
readSetEnd()
bool = readBool()
byte = readByte()
i16 = readI16()
i32 = readI32()
i64 = readI64()
double = readDouble()
string = readString()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
// 协议,对传输内容的封装
class TProtocol {
public:
virtual ~TProtocol();

virtual uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) = 0; // 纯虚函数,子类必须实现

virtual uint32_t writeMessageEnd_virt() = 0;

virtual uint32_t writeStructBegin_virt(const char* name) = 0;

virtual uint32_t writeStructEnd_virt() = 0;

virtual uint32_t writeFieldBegin_virt(const char* name,
const TType fieldType,
const int16_t fieldId) = 0;

virtual uint32_t writeFieldEnd_virt() = 0;

virtual uint32_t writeFieldStop_virt() = 0;

virtual uint32_t writeMapBegin_virt(const TType keyType, const TType valType, const uint32_t size)
= 0;

virtual uint32_t writeMapEnd_virt() = 0;

virtual uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) = 0;

virtual uint32_t writeListEnd_virt() = 0;

virtual uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) = 0;

virtual uint32_t writeSetEnd_virt() = 0;

virtual uint32_t writeBool_virt(const bool value) = 0;

virtual uint32_t writeByte_virt(const int8_t byte) = 0;

virtual uint32_t writeI16_virt(const int16_t i16) = 0;

virtual uint32_t writeI32_virt(const int32_t i32) = 0;

virtual uint32_t writeI64_virt(const int64_t i64) = 0;

virtual uint32_t writeDouble_virt(const double dub) = 0;

virtual uint32_t writeString_virt(const std::string& str) = 0;

virtual uint32_t writeBinary_virt(const std::string& str) = 0;

uint32_t writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
T_VIRTUAL_CALL();
return writeMessageBegin_virt(name, messageType, seqid);
}

uint32_t writeMessageEnd() {
T_VIRTUAL_CALL();
return writeMessageEnd_virt();
}

uint32_t writeStructBegin(const char* name) {
T_VIRTUAL_CALL();
return writeStructBegin_virt(name);
}

uint32_t writeStructEnd() {
T_VIRTUAL_CALL();
return writeStructEnd_virt();
}

uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId) {
T_VIRTUAL_CALL();
return writeFieldBegin_virt(name, fieldType, fieldId);
}

uint32_t writeFieldEnd() {
T_VIRTUAL_CALL();
return writeFieldEnd_virt();
}

uint32_t writeFieldStop() {
T_VIRTUAL_CALL();
return writeFieldStop_virt();
}

uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size) {
T_VIRTUAL_CALL();
return writeMapBegin_virt(keyType, valType, size);
}

uint32_t writeMapEnd() {
T_VIRTUAL_CALL();
return writeMapEnd_virt();
}

uint32_t writeListBegin(const TType elemType, const uint32_t size) {
T_VIRTUAL_CALL();
return writeListBegin_virt(elemType, size);
}

uint32_t writeListEnd() {
T_VIRTUAL_CALL();
return writeListEnd_virt();
}

uint32_t writeSetBegin(const TType elemType, const uint32_t size) {
T_VIRTUAL_CALL();
return writeSetBegin_virt(elemType, size);
}

uint32_t writeSetEnd() {
T_VIRTUAL_CALL();
return writeSetEnd_virt();
}

uint32_t writeBool(const bool value) {
T_VIRTUAL_CALL();
return writeBool_virt(value);
}

uint32_t writeByte(const int8_t byte) {
T_VIRTUAL_CALL();
return writeByte_virt(byte);
}

uint32_t writeI16(const int16_t i16) {
T_VIRTUAL_CALL();
return writeI16_virt(i16);
}

uint32_t writeI32(const int32_t i32) {
T_VIRTUAL_CALL();
return writeI32_virt(i32);
}

uint32_t writeI64(const int64_t i64) {
T_VIRTUAL_CALL();
return writeI64_virt(i64);
}

uint32_t writeDouble(const double dub) {
T_VIRTUAL_CALL();
return writeDouble_virt(dub);
}

uint32_t writeString(const std::string& str) {
T_VIRTUAL_CALL();
return writeString_virt(str);
}

uint32_t writeBinary(const std::string& str) {
T_VIRTUAL_CALL();
return writeBinary_virt(str);
}

virtual uint32_t readMessageBegin_virt(std::string& name,
TMessageType& messageType,
int32_t& seqid) = 0;

virtual uint32_t readMessageEnd_virt() = 0;

virtual uint32_t readStructBegin_virt(std::string& name) = 0;

virtual uint32_t readStructEnd_virt() = 0;

virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) = 0;

virtual uint32_t readFieldEnd_virt() = 0;

virtual uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) = 0;

virtual uint32_t readMapEnd_virt() = 0;

virtual uint32_t readListBegin_virt(TType& elemType, uint32_t& size) = 0;

virtual uint32_t readListEnd_virt() = 0;

virtual uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) = 0;

virtual uint32_t readSetEnd_virt() = 0;

virtual uint32_t readBool_virt(bool& value) = 0;

virtual uint32_t readBool_virt(std::vector<bool>::reference value) = 0;

virtual uint32_t readByte_virt(int8_t& byte) = 0;

virtual uint32_t readI16_virt(int16_t& i16) = 0;

virtual uint32_t readI32_virt(int32_t& i32) = 0;

virtual uint32_t readI64_virt(int64_t& i64) = 0;

virtual uint32_t readDouble_virt(double& dub) = 0;

virtual uint32_t readString_virt(std::string& str) = 0;

virtual uint32_t readBinary_virt(std::string& str) = 0;

uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqid) {
T_VIRTUAL_CALL();
return readMessageBegin_virt(name, messageType, seqid);
}

uint32_t readMessageEnd() {
T_VIRTUAL_CALL();
return readMessageEnd_virt();
}

uint32_t readStructBegin(std::string& name) {
T_VIRTUAL_CALL();
return readStructBegin_virt(name);
}

uint32_t readStructEnd() {
T_VIRTUAL_CALL();
return readStructEnd_virt();
}

uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) {
T_VIRTUAL_CALL();
return readFieldBegin_virt(name, fieldType, fieldId);
}

uint32_t readFieldEnd() {
T_VIRTUAL_CALL();
return readFieldEnd_virt();
}

uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size) {
T_VIRTUAL_CALL();
return readMapBegin_virt(keyType, valType, size);
}

uint32_t readMapEnd() {
T_VIRTUAL_CALL();
return readMapEnd_virt();
}

uint32_t readListBegin(TType& elemType, uint32_t& size) {
T_VIRTUAL_CALL();
return readListBegin_virt(elemType, size);
}

uint32_t readListEnd() {
T_VIRTUAL_CALL();
return readListEnd_virt();
}

uint32_t readSetBegin(TType& elemType, uint32_t& size) {
T_VIRTUAL_CALL();
return readSetBegin_virt(elemType, size);
}

uint32_t readSetEnd() {
T_VIRTUAL_CALL();
return readSetEnd_virt();
}

uint32_t readBool(bool& value) {
T_VIRTUAL_CALL();
return readBool_virt(value);
}

uint32_t readByte(int8_t& byte) {
T_VIRTUAL_CALL();
return readByte_virt(byte);
}

uint32_t readI16(int16_t& i16) {
T_VIRTUAL_CALL();
return readI16_virt(i16);
}

uint32_t readI32(int32_t& i32) {
T_VIRTUAL_CALL();
return readI32_virt(i32);
}

uint32_t readI64(int64_t& i64) {
T_VIRTUAL_CALL();
return readI64_virt(i64);
}

uint32_t readDouble(double& dub) {
T_VIRTUAL_CALL();
return readDouble_virt(dub);
}

uint32_t readString(std::string& str) {
T_VIRTUAL_CALL();
return readString_virt(str);
}

uint32_t readBinary(std::string& str) {
T_VIRTUAL_CALL();
return readBinary_virt(str);
}

uint32_t readBool(std::vector<bool>::reference value) {
T_VIRTUAL_CALL();
return readBool_virt(value);
}

uint32_t skip(TType type) {
T_VIRTUAL_CALL();
return skip_virt(type);
}
virtual uint32_t skip_virt(TType type);

inline std::shared_ptr<TTransport> getTransport() { return ptrans_; }

inline std::shared_ptr<TTransport> getInputTransport() { return ptrans_; }
inline std::shared_ptr<TTransport> getOutputTransport() { return ptrans_; }

oid incrementInputRecursionDepth() {
if (recursion_limit_ < ++input_recursion_depth_) {
throw TProtocolException(TProtocolException::DEPTH_LIMIT);
}
}
void decrementInputRecursionDepth() { --input_recursion_depth_; }

void incrementOutputRecursionDepth() {
if (recursion_limit_ < ++output_recursion_depth_) {
throw TProtocolException(TProtocolException::DEPTH_LIMIT);
}
}
void decrementOutputRecursionDepth() { --output_recursion_depth_; }

uint32_t getRecursionLimit() const {return recursion_limit_;}
void setRecurisionLimit(uint32_t depth) {recursion_limit_ = depth;}

protected:
TProtocol(std::shared_ptr<TTransport> ptrans)
: ptrans_(ptrans), input_recursion_depth_(0), output_recursion_depth_(0), recursion_limit_(DEFAULT_RECURSION_LIMIT)
{}

std::shared_ptr<TTransport> ptrans_;

private:
TProtocol() = default;
uint32_t input_recursion_depth_;
uint32_t output_recursion_depth_;
uint32_t recursion_limit_;
};

// 默认协议
class TProtocolDefaults : public TProtocol {
public:
uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqid) {
(void)name;
(void)messageType;
(void)seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readMessageEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readStructBegin(std::string& name) {
(void)name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readStructEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) {
(void)name;
(void)fieldType;
(void)fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readFieldEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size) {
(void)keyType;
(void)valType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readMapEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readListBegin(TType& elemType, uint32_t& size) {
(void)elemType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readListEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readSetBegin(TType& elemType, uint32_t& size) {
(void)elemType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readSetEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readBool(bool& value) {
(void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readBool(std::vector<bool>::reference value) {
(void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readByte(int8_t& byte) {
(void)byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readI16(int16_t& i16) {
(void)i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readI32(int32_t& i32) {
(void)i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readI64(int64_t& i64) {
(void)i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readDouble(double& dub) {
(void)dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readString(std::string& str) {
(void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t readBinary(std::string& str) {
(void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support reading (yet).");
}

uint32_t writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
(void)name;
(void)messageType;
(void)seqid;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeMessageEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeStructBegin(const char* name) {
(void)name;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeStructEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId) {
(void)name;
(void)fieldType;
(void)fieldId;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeFieldEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeFieldStop() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size) {
(void)keyType;
(void)valType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeMapEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeListBegin(const TType elemType, const uint32_t size) {
(void)elemType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeListEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeSetBegin(const TType elemType, const uint32_t size) {
(void)elemType;
(void)size;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeSetEnd() {
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeBool(const bool value) {
(void)value;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeByte(const int8_t byte) {
(void)byte;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeI16(const int16_t i16) {
(void)i16;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeI32(const int32_t i32) {
(void)i32;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeI64(const int64_t i64) {
(void)i64;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeDouble(const double dub) {
(void)dub;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeString(const std::string& str) {
(void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t writeBinary(const std::string& str) {
(void)str;
throw TProtocolException(TProtocolException::NOT_IMPLEMENTED,
"this protocol does not support writing (yet).");
}

uint32_t skip(TType type) { return ::apache::thrift::protocol::skip(*this, type); }

protected:
TProtocolDefaults(std::shared_ptr<TTransport> ptrans) : TProtocol(ptrans) {}
};

// TVirtualProtocol类型的实现,模板类型,可以传入子类
template <class Protocol_, class Super_ = TProtocolDefaults>
class TVirtualProtocol : public Super_ {
public:
uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) override {
return static_cast<Protocol_*>(this)->writeMessageBegin(name, messageType, seqid);
}

uint32_t writeMessageEnd_virt() override {
return static_cast<Protocol_*>(this)->writeMessageEnd();
}

uint32_t writeStructBegin_virt(const char* name) override {
return static_cast<Protocol_*>(this)->writeStructBegin(name);
}

uint32_t writeStructEnd_virt() override { return static_cast<Protocol_*>(this)->writeStructEnd(); }

uint32_t writeFieldBegin_virt(const char* name,
const TType fieldType,
const int16_t fieldId) override {
return static_cast<Protocol_*>(this)->writeFieldBegin(name, fieldType, fieldId);
}

uint32_t writeFieldEnd_virt() override { return static_cast<Protocol_*>(this)->writeFieldEnd(); }

uint32_t writeFieldStop_virt() override { return static_cast<Protocol_*>(this)->writeFieldStop(); }

uint32_t writeMapBegin_virt(const TType keyType,
const TType valType,
const uint32_t size) override {
return static_cast<Protocol_*>(this)->writeMapBegin(keyType, valType, size);
}

uint32_t writeMapEnd_virt() override { return static_cast<Protocol_*>(this)->writeMapEnd(); }

uint32_t writeListBegin_virt(const TType elemType, const uint32_t size) override {
return static_cast<Protocol_*>(this)->writeListBegin(elemType, size);
}

uint32_t writeListEnd_virt() override { return static_cast<Protocol_*>(this)->writeListEnd(); }

uint32_t writeSetBegin_virt(const TType elemType, const uint32_t size) override {
return static_cast<Protocol_*>(this)->writeSetBegin(elemType, size);
}

uint32_t writeSetEnd_virt() override { return static_cast<Protocol_*>(this)->writeSetEnd(); }

uint32_t writeBool_virt(const bool value) override {
return static_cast<Protocol_*>(this)->writeBool(value);
}

uint32_t writeByte_virt(const int8_t byte) override {
return static_cast<Protocol_*>(this)->writeByte(byte);
}

uint32_t writeI16_virt(const int16_t i16) override {
return static_cast<Protocol_*>(this)->writeI16(i16);
}

uint32_t writeI32_virt(const int32_t i32) override {
return static_cast<Protocol_*>(this)->writeI32(i32);
}

uint32_t writeI64_virt(const int64_t i64) override {
return static_cast<Protocol_*>(this)->writeI64(i64);
}

uint32_t writeDouble_virt(const double dub) override {
return static_cast<Protocol_*>(this)->writeDouble(dub);
}

uint32_t writeString_virt(const std::string& str) override {
return static_cast<Protocol_*>(this)->writeString(str);
}

uint32_t writeBinary_virt(const std::string& str) override {
return static_cast<Protocol_*>(this)->writeBinary(str);
}

uint32_t readMessageBegin_virt(std::string& name,
TMessageType& messageType,
int32_t& seqid) override {
return static_cast<Protocol_*>(this)->readMessageBegin(name, messageType, seqid);
}

uint32_t readMessageEnd_virt() override { return static_cast<Protocol_*>(this)->readMessageEnd(); }

uint32_t readStructBegin_virt(std::string& name) override {
return static_cast<Protocol_*>(this)->readStructBegin(name);
}

uint32_t readStructEnd_virt() override { return static_cast<Protocol_*>(this)->readStructEnd(); }

uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) override {
return static_cast<Protocol_*>(this)->readFieldBegin(name, fieldType, fieldId);
}

uint32_t readFieldEnd_virt() override { return static_cast<Protocol_*>(this)->readFieldEnd(); }

uint32_t readMapBegin_virt(TType& keyType, TType& valType, uint32_t& size) override {
return static_cast<Protocol_*>(this)->readMapBegin(keyType, valType, size);
}

uint32_t readMapEnd_virt() override { return static_cast<Protocol_*>(this)->readMapEnd(); }

uint32_t readListBegin_virt(TType& elemType, uint32_t& size) override {
return static_cast<Protocol_*>(this)->readListBegin(elemType, size);
}

uint32_t readListEnd_virt() override { return static_cast<Protocol_*>(this)->readListEnd(); }

uint32_t readSetBegin_virt(TType& elemType, uint32_t& size) override {
return static_cast<Protocol_*>(this)->readSetBegin(elemType, size);
}

uint32_t readSetEnd_virt() override { return static_cast<Protocol_*>(this)->readSetEnd(); }

uint32_t readBool_virt(bool& value) override {
return static_cast<Protocol_*>(this)->readBool(value);
}

uint32_t readBool_virt(std::vector<bool>::reference value) override {
return static_cast<Protocol_*>(this)->readBool(value);
}

uint32_t readByte_virt(int8_t& byte) override {
return static_cast<Protocol_*>(this)->readByte(byte);
}

uint32_t readI16_virt(int16_t& i16) override {
return static_cast<Protocol_*>(this)->readI16(i16);
}

uint32_t readI32_virt(int32_t& i32) override {
return static_cast<Protocol_*>(this)->readI32(i32);
}

uint32_t readI64_virt(int64_t& i64) override {
return static_cast<Protocol_*>(this)->readI64(i64);
}

uint32_t readDouble_virt(double& dub) override {
return static_cast<Protocol_*>(this)->readDouble(dub);
}

uint32_t readString_virt(std::string& str) override {
return static_cast<Protocol_*>(this)->readString(str);
}

uint32_t readBinary_virt(std::string& str) override {
return static_cast<Protocol_*>(this)->readBinary(str);
}

uint32_t skip_virt(TType type) override { return static_cast<Protocol_*>(this)->skip(type); }

uint32_t skip(TType type) {
auto* const prot = static_cast<Protocol_*>(this);
return ::apache::thrift::protocol::skip(*prot, type);
}

uint32_t readBool(std::vector<bool>::reference value) {
bool b = false;
uint32_t ret = static_cast<Protocol_*>(this)->readBool(b);
value = b;
return ret;
}
using Super_::readBool; // so we don't hide readBool(bool&)

protected:
TVirtualProtocol(std::shared_ptr<TTransport> ptrans) : Super_(ptrans) {}
};

// 常用的BinaryProtocol
template <class Transport_, class ByteOrder_ = TNetworkBigEndian>
class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > {
public:
static const int32_t VERSION_MASK = ((int32_t)0xffff0000);
static const int32_t VERSION_1 = ((int32_t)0x80010000);

TBinaryProtocolT(std::shared_ptr<Transport_> trans)
: TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> >(trans),
trans_(trans.get()),
string_limit_(0),
container_limit_(0),
strict_read_(false),
strict_write_(true) {}

TBinaryProtocolT(std::shared_ptr<Transport_> trans,
int32_t string_limit,
int32_t container_limit,
bool strict_read,
bool strict_write)
: TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> >(trans),
trans_(trans.get()),
string_limit_(string_limit),
container_limit_(container_limit),
strict_read_(strict_read),
strict_write_(strict_write) {}

void setStringSizeLimit(int32_t string_limit) { string_limit_ = string_limit; }

void setContainerSizeLimit(int32_t container_limit) { container_limit_ = container_limit; }

void setStrict(bool strict_read, bool strict_write) {
strict_read_ = strict_read;
strict_write_ = strict_write;
}

/*ol*/ uint32_t writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid);

/*ol*/ uint32_t writeMessageEnd();

inline uint32_t writeStructBegin(const char* name);

inline uint32_t writeStructEnd();

inline uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId);

inline uint32_t writeFieldEnd();

inline uint32_t writeFieldStop();

inline uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size);

inline uint32_t writeMapEnd();

inline uint32_t writeListBegin(const TType elemType, const uint32_t size);

inline uint32_t writeListEnd();

inline uint32_t writeSetBegin(const TType elemType, const uint32_t size);

inline uint32_t writeSetEnd();

inline uint32_t writeBool(const bool value);

inline uint32_t writeByte(const int8_t byte);

inline uint32_t writeI16(const int16_t i16);

inline uint32_t writeI32(const int32_t i32);

inline uint32_t writeI64(const int64_t i64);

inline uint32_t writeDouble(const double dub);

template <typename StrType>
inline uint32_t writeString(const StrType& str);

inline uint32_t writeBinary(const std::string& str);

/*ol*/ uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqid);

/*ol*/ uint32_t readMessageEnd();

inline uint32_t readStructBegin(std::string& name);

inline uint32_t readStructEnd();

inline uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId);

inline uint32_t readFieldEnd();

inline uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size);

inline uint32_t readMapEnd();

inline uint32_t readListBegin(TType& elemType, uint32_t& size);

inline uint32_t readListEnd();

inline uint32_t readSetBegin(TType& elemType, uint32_t& size);

inline uint32_t readSetEnd();

inline uint32_t readBool(bool& value);
using TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> >::readBool;

inline uint32_t readByte(int8_t& byte);

inline uint32_t readI16(int16_t& i16);

inline uint32_t readI32(int32_t& i32);

inline uint32_t readI64(int64_t& i64);

inline uint32_t readDouble(double& dub);

template <typename StrType>
inline uint32_t readString(StrType& str);

inline uint32_t readBinary(std::string& str);

protected:
template <typename StrType>
uint32_t readStringBody(StrType& str, int32_t sz);

Transport_* trans_;

int32_t string_limit_;
int32_t container_limit_;

bool strict_read_;
bool strict_write_;
};

// 具体的细节
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI64(const int64_t i64) {
auto net = (int64_t)ByteOrder_::toWire64(i64);
this->trans_->write((uint8_t*)&net, 8); // 按照一定的格式调用底层io接口
return 8;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
if (this->strict_write_) {
int32_t version = (VERSION_1) | ((int32_t)messageType);
uint32_t wsize = 0;
wsize += writeI32(version);
wsize += writeString(name);
wsize += writeI32(seqid);
return wsize;
} else {
uint32_t wsize = 0;
wsize += writeString(name);
wsize += writeByte((int8_t)messageType);
wsize += writeI32(seqid);
return wsize;
}
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageEnd() {
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructBegin(const char* name) {
(void)name;
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructEnd() {
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldBegin(const char* name,
const TType fieldType,
const int16_t fieldId) {
(void)name;
uint32_t wsize = 0;
wsize += writeByte((int8_t)fieldType);
wsize += writeI16(fieldId);
return wsize;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldEnd() {
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() {
return writeByte((int8_t)T_STOP); // 这里的T_STOP是0
}

// 读的部分
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readI32(int32_t& i32) {
union bytes {
uint8_t b[4];
int32_t all;
} theBytes;
this->trans_->readAll(theBytes.b, 4);
i32 = (int32_t)ByteOrder_::fromWire32(theBytes.all);
return 4;
}

template <class Transport_, class ByteOrder_>
template <typename StrType>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStringBody(StrType& str, int32_t size) {
uint32_t result = 0;

// Catch error cases
if (size < 0) {
throw TProtocolException(TProtocolException::NEGATIVE_SIZE);
}
if (this->string_limit_ > 0 && size > this->string_limit_) {
throw TProtocolException(TProtocolException::SIZE_LIMIT);
}

// Catch empty string case
if (size == 0) {
str.clear();
return result;
}

// Try to borrow first
const uint8_t* borrow_buf;
uint32_t got = size;
if ((borrow_buf = this->trans_->borrow(nullptr, &got))) {
str.assign((const char*)borrow_buf, size);
this->trans_->consume(size);
return size;
}

str.resize(size);
this->trans_->readAll(reinterpret_cast<uint8_t*>(&str[0]), size);
return (uint32_t)size;
}

template <class Transport_, class ByteOrder_>
template <typename StrType>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readString(StrType& str) {
uint32_t result;
int32_t size;
result = readI32(size); // 先读长度
return result + readStringBody(str, size); // 再根据长度读内容,读到str里面
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readMessageBegin(std::string& name,
TMessageType& messageType,
int32_t& seqid) {
uint32_t result = 0;
int32_t sz;
result += readI32(sz);

if (sz < 0) {
// Check for correct version number
int32_t version = sz & VERSION_MASK;
if (version != VERSION_1) {
throw TProtocolException(TProtocolException::BAD_VERSION, "Bad version identifier");
}
messageType = (TMessageType)(sz & 0x000000ff);
result += readString(name); // 见上面的分析
result += readI32(seqid);
} else {
if (this->strict_read_) {
throw TProtocolException(TProtocolException::BAD_VERSION,
"No version identifier... old protocol client in strict mode?");
} else {
// Handle pre-versioned input
int8_t type;
result += readStringBody(name, sz);
result += readByte(type);
messageType = (TMessageType)type;
result += readI32(seqid);
}
}
return result;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readMessageEnd() {
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStructBegin(std::string& name) {
name = "";
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readStructEnd() {
return 0;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readFieldBegin(std::string& name,
TType& fieldType,
int16_t& fieldId) {
(void)name;
uint32_t result = 0;
int8_t type;
result += readByte(type);
fieldType = (TType)type;
if (fieldType == T_STOP) {
fieldId = 0;
return result;
}
result += readI16(fieldId);
return result;
}

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::readFieldEnd() {
return 0;
}

Processor

Processor封装了从输入流中读取数据并写入输出流的能力。输入和输出流由Protocol对象表示。Processor接口非常简单

1
2
3
interface TProcessor {
bool process(TProtocol in, TProtocol out) throws TException
}

编译器会生成特定于服务的Processor实现。Processor从线路中读取数据(使用输入协议),将处理委托给处理程序(由用户实现),并通过线路向外部写入响应(使用输出协议)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class TProcessor {
public:
virtual ~TProcessor() = default;

virtual bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) = 0; // 子类实现

bool process(std::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) { // 协议是由server传的
return process(io, io, connectionContext); // 输入输出是同一个协议
}

std::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }

void setEventHandler(std::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}

protected:
TProcessor() = default;

std::shared_ptr<TProcessorEventHandler> eventHandler_;
};

// 真正处理协议的处理器
class TMultiplexedProcessor : public TProcessor {
public:
typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;

void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
services[serviceName] = processor; // 可能有多个处理器
}

void registerDefault(const std::shared_ptr<TProcessor>& processor) {
defaultProcessor = processor;
}

TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
const std::string& name,
int32_t seqid,
const std::string& msg) const {
in->skip(::apache::thrift::protocol::T_STRUCT);
in->readMessageEnd();
in->getTransport()->readEnd();
::apache::thrift::TApplicationException
x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
"TMultiplexedProcessor: " + msg);
out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(out.get());
out->writeMessageEnd();
out->getTransport()->writeEnd();
out->getTransport()->flush();
return TException(msg);
}

bool process(std::shared_ptr<protocol::TProtocol> in, // 只处理协议
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string name;
protocol::TMessageType type;
int32_t seqid;

in->readMessageBegin(name, type, seqid); // 读出数据name, type, seqid

if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
throw protocol_error(in, out, name, seqid, "Unexpected message type");
}

boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));

std::vector<std::string> tokens;
std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));

if (tokens.size() == 2) {
auto it = services.find(tokens[0]); // 找到对应处理器索引

if (it != services.end()) {
std::shared_ptr<TProcessor> processor = it->second; // 找到了处理器
return processor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
out,
connectionContext); // 真正的处理过程
} else {
throw protocol_error(in, out, name, seqid,
"Unknown service: " + tokens[0] +
". Did you forget to call registerProcessor()?");
}
} else if (tokens.size() == 1) {
if (defaultProcessor) { // 使用默认的处理器
return defaultProcessor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
out,
connectionContext);
} else {
throw protocol_error(in, out, name, seqid,
"Non-multiplexed client request dropped. "
"Did you forget to call defaultProcessor()?");
}
} else {
throw protocol_error(in, out, name, seqid,
"Wrong number of tokens.");
}
}

private:
services_t services;

std::shared_ptr<TProcessor> defaultProcessor;
};

// 目前使用的这个
class TDispatchProcessor : public TProcessor {
public:
bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string fname;
protocol::TMessageType mtype;
int32_t seqid;
in->readMessageBegin(fname, mtype, seqid); // 读取fname, mtype, seqid

if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
GlobalOutput.printf("received invalid message type %d from client", mtype);
return false;
}

return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);
}

protected:
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,
apache::thrift::protocol::TProtocol* out,
const std::string& fname,
int32_t seqid,
void* callContext) = 0; // 还要由具体的服务子类来实现
};

// 这里是由thrift自动生成的,父类是TDispatchProcessor
class SubscriberServiceProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
::std::shared_ptr<SubscriberServiceIf> iface_;
virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
private:
typedef void (SubscriberServiceProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_; // 存到map里面,查找时间O(1)
// 每个rpc接口对应一个函数
void process_getMaxUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_getCurUpSpeed(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
public:
SubscriberServiceProcessor(::std::shared_ptr<SubscriberServiceIf> iface) :
iface_(iface) {
processMap_["getMaxUpSpeed"] = &SubscriberServiceProcessor::process_getMaxUpSpeed; // 存到map里面,查找时间O(1)
processMap_["getCurUpSpeed"] = &SubscriberServiceProcessor::process_getCurUpSpeed;
}

virtual ~SubscriberServiceProcessor() {}
};

bool SubscriberServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) {
ProcessMap::iterator pfn;
pfn = processMap_.find(fname);
if (pfn == processMap_.end()) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'");
oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(oprot);
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
return true;
}
(this->*(pfn->second))(seqid, iprot, oprot, callContext); // 找到了函数,直接调用对应的接口
return true;
}

// processor也是采用工厂模式生成对象
class TProcessorFactory {
public:
virtual ~TProcessorFactory() = default;

virtual std::shared_ptr<TProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; // 由具体的工厂来实现
};

// 这个具体的工厂也是thrift自动生成的
class SubscriberServiceProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:
SubscriberServiceProcessorFactory(const ::std::shared_ptr< SubscriberServiceIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}

::std::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);

protected:
::std::shared_ptr< SubscriberServiceIfFactory > handlerFactory_;
};

::std::shared_ptr< ::apache::thrift::TProcessor > SubscriberServiceProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) {
::apache::thrift::ReleaseHandler< SubscriberServiceIfFactory > cleanup(handlerFactory_);
::std::shared_ptr< SubscriberServiceIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
::std::shared_ptr< ::apache::thrift::TProcessor > processor(new SubscriberServiceProcessor(handler)); // 和上面相呼应了
return processor;
}

Server

最上层就是rpc服务器了,目前常用TNonblockingServer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
class TServer : public concurrency::Runnable {
public:
~TServer() override = default;

virtual void serve() = 0; // 子类需要实现serve

virtual void stop() {}

// Allows running the server as a Runnable thread
void run() override { serve(); } // server其实也是个线程,需要实现run函数

std::shared_ptr<TProcessorFactory> getProcessorFactory() { return processorFactory_; }

std::shared_ptr<TServerTransport> getServerTransport() { return serverTransport_; }

std::shared_ptr<TTransportFactory> getInputTransportFactory() { return inputTransportFactory_; }

std::shared_ptr<TTransportFactory> getOutputTransportFactory() {
return outputTransportFactory_;
}

std::shared_ptr<TProtocolFactory> getInputProtocolFactory() { return inputProtocolFactory_; }

std::shared_ptr<TProtocolFactory> getOutputProtocolFactory() { return outputProtocolFactory_; }

std::shared_ptr<TServerEventHandler> getEventHandler() { return eventHandler_; }

protected:
TServer(const std::shared_ptr<TProcessorFactory>& processorFactory)
: processorFactory_(processorFactory) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}

TServer(const std::shared_ptr<TProcessor>& processor)
: processorFactory_(new TSingletonProcessorFactory(processor)) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}

TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(processorFactory), serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}

TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport) {
setInputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(std::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
}

TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}

TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& transportFactory,
const std::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}

TServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(processorFactory),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}

TServer(const std::shared_ptr<TProcessor>& processor,
const std::shared_ptr<TServerTransport>& serverTransport,
const std::shared_ptr<TTransportFactory>& inputTransportFactory,
const std::shared_ptr<TTransportFactory>& outputTransportFactory,
const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
const std::shared_ptr<TProtocolFactory>& outputProtocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(inputTransportFactory),
outputTransportFactory_(outputTransportFactory),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory) {}

std::shared_ptr<TProcessor> getProcessor(std::shared_ptr<TProtocol> inputProtocol,
std::shared_ptr<TProtocol> outputProtocol,
std::shared_ptr<TTransport> transport) {
TConnectionInfo connInfo;
connInfo.input = inputProtocol;
connInfo.output = outputProtocol;
connInfo.transport = transport;
return processorFactory_->getProcessor(connInfo);
}

// Class variables
std::shared_ptr<TProcessorFactory> processorFactory_;
std::shared_ptr<TServerTransport> serverTransport_;

std::shared_ptr<TTransportFactory> inputTransportFactory_;
std::shared_ptr<TTransportFactory> outputTransportFactory_;

std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
std::shared_ptr<TProtocolFactory> outputProtocolFactory_;

std::shared_ptr<TServerEventHandler> eventHandler_;

public:
void setInputTransportFactory(std::shared_ptr<TTransportFactory> inputTransportFactory) {
inputTransportFactory_ = inputTransportFactory;
}

void setOutputTransportFactory(std::shared_ptr<TTransportFactory> outputTransportFactory) {
outputTransportFactory_ = outputTransportFactory;
}

void setInputProtocolFactory(std::shared_ptr<TProtocolFactory> inputProtocolFactory) {
inputProtocolFactory_ = inputProtocolFactory;
}

void setOutputProtocolFactory(std::shared_ptr<TProtocolFactory> outputProtocolFactory) {
outputProtocolFactory_ = outputProtocolFactory;
}

void setServerEventHandler(std::shared_ptr<TServerEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
};

// 子类实现了serve
void TNonblockingServer::serve() {

if (ioThreads_.empty())
registerEvents(NULL);

ioThreads_[0]->run();

for (uint32_t i = 0; i < ioThreads_.size(); ++i) { // 多个io线程
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}

// 服务器初始化过程
void TNonblockingServer::registerEvents(event_base* user_event_base) { // 使用了libevent
userEventBase_ = user_event_base;

// init listen socket
if (serverSocket_ == THRIFT_INVALID_SOCKET)
createAndListenOnSocket();

// set up the IO threads
assert(ioThreads_.empty());
if (!numIOThreads_) {
numIOThreads_ = DEFAULT_IO_THREADS; // static const int DEFAULT_IO_THREADS = 1;
}
// User-provided event-base doesn't works for multi-threaded servers
assert(numIOThreads_ == 1 || !userEventBase_);

for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);

shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); // 传入了listenFd
ioThreads_.push_back(thread); // 多个io线程
}

// Notify handler of the preServe event
if (eventHandler_) {
eventHandler_->preServe();
}

// Start all of our helper IO threads. Note that the threads run forever,
// only terminating if stop() is called.
assert(ioThreads_.size() == numIOThreads_);
assert(ioThreads_.size() > 0);

GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
ioThreads_.size());

// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !USE_BOOST_THREAD && !USE_STD_THREAD
PlatformThreadFactory::OTHER, // scheduler
PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
#endif
false // detached
));

assert(ioThreadFactory_.get());

// intentionally starting at thread 1, not 0
for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
ioThreads_[i]->setThread(thread);
thread->start(); // 启动线程,会调用TNonblockingIOThread::run()
}
}

// Register the events for the primary (listener) IO thread
ioThreads_[0]->registerEvents(); // 至少有一个监听的io线程
}

void TNonblockingIOThread::run() {
if (eventBase_ == NULL) {
registerEvents(); // 线程运行时会注册一次,每个线程单独有个reactor
}
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}

if (eventBase_ != NULL)
{
GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
// Run libevent engine, never returns, invokes calls to eventHandler
event_base_loop(eventBase_, 0); // 阻塞运行

if (useHighPriority_) {
setCurrentThreadHighPriority(false);
}

// cleans up our registered events
cleanupEvents();
}

GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}

/**
* Register the core libevent events onto the proper base.
*/
void TNonblockingIOThread::registerEvents() { // 线程级别注册
threadId_ = Thread::get_current();

assert(eventBase_ == 0);
eventBase_ = getServer()->getUserEventBase();
if (eventBase_ == NULL) {
eventBase_ = event_base_new();
ownEventBase_ = true;
}

// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
event_get_version(),
event_base_get_method(eventBase_));
}

if (listenSocket_ != THRIFT_INVALID_SOCKET) {
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_); // 注册accept事件

// Add the event and start up the server
if (-1 == event_add(&serverEvent_, 0)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on server listen event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}

createNotificationPipe(); // 创建pipe

// Create an event to be notified when a task finishes
event_set(&notificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TNonblockingIOThread::notifyHandler, // 管道收到通知时回调notifyHandler
this);

// Attach to the base
event_base_set(eventBase_, &notificationEvent_);

// Add the event and start up the server
if (-1 == event_add(&notificationEvent_, 0)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on task-done notification event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}

// 服务器监听回调
// static void listenHandler(evutil_socket_t fd, short which, void* v) {
// ((TNonblockingServer*)v)->handleEvent(fd, which); // 监听回调最终函数
// }
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { // 监听的io线程处理
(void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);

// Going to accept a new client socket
stdcxx::shared_ptr<TSocket> clientSocket;

clientSocket = serverTransport_->accept();
if (clientSocket) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
clientSocket->close();
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
clientSocket->close();
return;
}
}
}

// Create a new TConnection for this client socket.
TConnection* clientConnection = createConnection(clientSocket); // 选一个线程来管理连接,后续进行状态机处理

// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
clientSocket->close();
return;
}

if (clientConnection->getIOThreadNumber() == 0) {
clientConnection->transition(); // 单个io,直接状态处理
} else {
if (!clientConnection->notifyIOThread()) { // 多个io,读写分离,通过notificationPipeFDs_通知,notifyHandler回调
GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
clientConnection->close();
}
}
}
}

/**
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(stdcxx::shared_ptr<TSocket> socket) { // server级别函数
// Check the stack
Guard g(connMutex_);

// pick an IO thread to handle this connection -- currently round robin
assert(nextIOThread_ < ioThreads_.size());
int selectedThreadIdx = nextIOThread_;
nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size()); // 轮询选择io线程

TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();

// Check the connection stack to see if we can re-use
TConnection* result = NULL;
if (connectionStack_.empty()) {
result = new TConnection(socket, ioThread);
++numTConnections_;
} else {
result = connectionStack_.top();
connectionStack_.pop();
result->setSocket(socket);
result->init(ioThread);
}
activeConnections_.push_back(result); // 记录活跃的连接
return result;
}

/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { // 其它io线程接收connection信息
TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;

while (true) {
TNonblockingServer::TConnection* connection = 0;
const int kSize = sizeof(connection);
long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0); // 接收connection信息
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
ioThread->breakLoop(false);
return;
}
connection->transition(); // 开始状态机处理
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
&& THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
ioThread->breakLoop(true);
return;
}
// exit the loop
break;
}
}
}

/**
* Five states for the nonblocking server:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT, // 初始化为appState_ = APP_INIT;
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};

static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { // TConnection回调函数
assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
((TConnection*)v)->workSocket();
}

// 连接状态机实现
void TNonblockingServer::TConnection::transition() {
// ensure this connection is active right now
assert(ioThread_);
assert(server_);

// Switch upon the state that we are currently in and move to a new state
switch (appState_) {

case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
if (server_->getHeaderTransport()) {
inputTransport_->resetBuffer(readBuffer_, readBufferPos_); // stdcxx::shared_ptr<TMemoryBuffer> inputTransport_;
outputTransport_->resetBuffer();
} else {
// We saved room for the framing size in case header transport needed it,
// but just skip it for the non-header case
inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4); // 先发到TMemoryBuffer里,可生成inputProtocol_
outputTransport_->resetBuffer();

// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
}

server_->incrementActiveProcessors(); // server级别

if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it

// Create task and dispatch to the thread manager
stdcxx::shared_ptr<Runnable> task = stdcxx::shared_ptr<Runnable>( // 封装成task
new Task(processor_, inputProtocol_, outputProtocol_, this)); // inputTransport_生成的inputProtocol_
// get input/transports
// factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
// factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);

// if (server_->getHeaderTransport()) {
// inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
// factoryOutputTransport_);
// outputProtocol_ = inputProtocol_;
// } else {
// inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
// outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
// }
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;

// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();

try {
server_->addTask(task); // threadManager_->add(task, 0LL, taskExpireTime_); 交给线程管理器,tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); 提交到多线程的队列里
} catch (IllegalStateException& ise) {
// The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
server_->decrementActiveProcessors();
close();
} catch (TimedOutException& to) {
GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
server_->decrementActiveProcessors();
close();
}

return;
} else {
try {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
processor_->process(inputProtocol_, outputProtocol_, connectionContext_); // 如果没有多线程,直接处理器处理
} catch (const TTransportException& ttx) {
GlobalOutput.printf(
"TNonblockingServer transport error in "
"process(): %s",
ttx.what());
server_->decrementActiveProcessors();
close();
return;
} catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
typeid(x).name(),
x.what());
server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
server_->decrementActiveProcessors();
close();
return;
}
}

// Intentionally fall through here, the call to process has written into
// the writeBuffer_

case APP_WAIT_TASK: // 线程处理完了,notifyIOThread跑到这里
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread

server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); // 需要发多少数据

// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {

// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND; // 开始发送

// Put the frame size into the write buffer
int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4); // 大小写到头部

// Socket into write mode
appState_ = APP_SEND_RESULT; // 发送结果
setWrite(); // 触发写回调

// Try to work the socket immediately
// workSocket();

return;
}

// In this case, the request was oneway and we should fall through
// right back into the read frame header state
goto LABEL_APP_INIT;

case APP_SEND_RESULT:
// it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}

// N.B.: We also intentionally fall through here into the INIT state!

LABEL_APP_INIT:
case APP_INIT:

// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
writeBufferSize_ = 0;

// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING; // 先接收
appState_ = APP_READ_FRAME_SIZE; // 换到下一个状态

readBufferPos_ = 0;

// Register read event
setRead(); // 读回调

// Try to work the socket right away
// workSocket();

return;

case APP_READ_FRAME_SIZE:
readWant_ += 4;

// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) { // 分配读取缓冲区的空间
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2; // 分配的空间为2倍大小
}

uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == NULL) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}

readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4); // 第一个位置保存大小

// Move into read request state
socketState_ = SOCKET_RECV; // 开始接收数据
appState_ = APP_READ_REQUEST; // 开始读请求

// Work the socket right away
workSocket();

return;

case APP_CLOSE_CONNECTION:
server_->decrementActiveProcessors();
close();
return;

default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
}
}

enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND }; // 初始化 socketState_ = SOCKET_RECV_FRAMING;

void TNonblockingServer::TConnection::workSocket() {
int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;

switch (socketState_) {
case SOCKET_RECV_FRAMING: // 初始化状态
union {
uint8_t buf[sizeof(uint32_t)];
uint32_t size;
} framing; // FramedTransport协议

// if we've already received some bytes we kept them here
framing.size = readWant_;
// determine size of this frame
try {
// Read from the socket
fetch = tSocket_->read(&framing.buf[readBufferPos_], // stdcxx::shared_ptr<TSocket> tSocket_;
uint32_t(sizeof(framing.size) - readBufferPos_));
if (fetch == 0) {
// Whenever we get here it means a remote disconnect
close();
return;
}
readBufferPos_ += fetch;
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();

return;
}
}

if (readBufferPos_ < sizeof(framing.size)) { // 至少要读framing字节
// more needed before frame size is known -- save what we have so far
readWant_ = framing.size;
return;
}

readWant_ = ntohl(framing.size); // 先得知道要读多少数据
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
GlobalOutput.printf(
"TNonblockingServer: frame size too large "
"(%" PRIu32 " > %" PRIu64
") from client %s. "
"Remote side not using TFramedTransport?",
readWant_,
(uint64_t)server_->getMaxFrameSize(),
tSocket_->getSocketInfo().c_str());
close();
return;
}
// size known; now get the rest of the frame
transition();
return;

case SOCKET_RECV:
// It is an error to be in this state if we already have all the data
assert(readBufferPos_ < readWant_);

try {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); // 读数据
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
}

return;
}

if (got > 0) {
// Move along in the buffer
readBufferPos_ += got; // 应用层缓冲区读了多少数据

// Check that we did not overdo it
assert(readBufferPos_ <= readWant_);

// We are done reading, move onto the next state
if (readBufferPos_ == readWant_) {
transition(); // 读到了完整的请求
}
return;
}

// Whenever we get down here it means a remote disconnect
close();

return;

case SOCKET_SEND:
// Should never have position past size
assert(writeBufferPos_ <= writeBufferSize_);

// If there is no data to send, then let us move on
if (writeBufferPos_ == writeBufferSize_) {
GlobalOutput("WARNING: Send state with no data to send");
transition();
return;
}

try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
} catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
}

writeBufferPos_ += sent;

// Did we overdo it?
assert(writeBufferPos_ <= writeBufferSize_);

// We are done!
if (writeBufferPos_ == writeBufferSize_) {
transition();
}

return;

default:
GlobalOutput.printf("Unexpected Socket State %d", socketState_);
assert(0);
}
}

// 由连接就封装成任务
class TNonblockingServer::TConnection::Task : public Runnable {
public:
Task(stdcxx::shared_ptr<TProcessor> processor,
stdcxx::shared_ptr<TProtocol> input,
stdcxx::shared_ptr<TProtocol> output,
TConnection* connection)
: processor_(processor),
input_(input),
output_(output),
connection_(connection),
serverEventHandler_(connection_->getServerEventHandler()),
connectionContext_(connection_->getConnectionContext()) {}

void run() {
try {
for (;;) {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
if (!processor_->process(input_, output_, connectionContext_) // 具体的任务调用process执行
|| !input_->getTransport()->peek()) {
break;
}
}
} catch (const TTransportException& ttx) {
GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const std::bad_alloc&) {
GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(),
x.what());
} catch (...) {
GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}

// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) { // 任务处理完了,通过pipe异步触发跑transition()
GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
connection_->server_->decrementActiveProcessors();
connection_->close();
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}

TConnection* getTConnection() { return connection_; }

private:
stdcxx::shared_ptr<TProcessor> processor_;
stdcxx::shared_ptr<TProtocol> input_;
stdcxx::shared_ptr<TProtocol> output_;
TConnection* connection_;
stdcxx::shared_ptr<TServerEventHandler> serverEventHandler_;
void* connectionContext_;
};

// 感悟:可通过pipe在监听线程和其它io线程间传递新连接,触发连接的状态处理,连接读取到请求数据后,丢到线程池处理,线程处理完成后,通过pipe异步触发状态机发送数据,所以pipe可以在多线程间通信,也可以触发异步流程,另外可以将fd的io收发绑定到指定的线程

// 主服务器:使用过程
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); // 线程池
std::shared_ptr<TNonblockingServerSocket> socket = std::make_shared<TNonblockingServerSocket>(path); // unix domain socket,transport
std::shared_ptr<TNonblockingServer> server = std::make_shared<TNonblockingServer>(processorFactory, // processor
std::make_shared<TBinaryProtocolFactory>(), // 协议
socket,
threadManager);
server->serve();

// 客户端监控线程:单独一个线程启动,因为会阻塞
shared_ptr<Runner> runner(new Runner(rss, processorFactory, name, port, workerCnt));
shared_ptr<ThreadFactory> threadFactory(
new ThreadFactory(true));
shared_ptr<apache::thrift::concurrency::Thread> thread = threadFactory->newThread(runner);
thread->start();
runner->readyBarrier();

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!