就一个连接池而已,为什么要采用装饰者模式,听着咋感觉那么高深?
所谓连接池,也就是连接先不close,放入池中,等待下次需要用的时候,直接从池中取出即可用,省去了tcp握手的时间,所以这个可以看作是一个长连接,知道连接池清除空闲连接把多余的连接清除掉才会释放。
而普通的连接,用完后如果生命周期内不再使用了,就会销毁掉。
所以基于普通的连接,要定义出线程池的那种用完放回线程池的连接,就需要把close的方法进行重写,所以可以采用继承的方法实现,相当于产生了两个子类,一个是普通连接类,一个是线程池化的连接类。
如图,如果我还想有个连接,它是优化了传输性能处理的,岂不是还要实现一个具体类?那如果我需要一个线程池化以及优化性能的连接呢?再生成一个类吗?显然不太可取了,类型会变得非常多。
这个时候可以使用装饰者模式来实现。
1 | ThriftTransportPool::init() // 简化实现 |
如果想要加入性能优化功能,再使用OptiTransport装饰一下就行,所以后期扩展比较方便,不会产生类爆炸的问题,新加特性增强,只需要无限套娃即可。
源码实现
包装下服务器相关信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46// ServerInfo.hpp
using namespace std;
class ServerInfo
{
public:
ServerInfo(string host, int port)
{
this->host = host;
this->port = port;
}
string getHost()
{
return host;
}
void setHost(string host)
{
this->host = host;
}
int getPort()
{
return port;
}
void setPort(int port)
{
this->port = port;
}
string tostring()
{
return "host:" + host + ",port:" + to_string(port);
}
private:
string host;
int port;
};
包装下thrift中的transport,类似于http中的socket。该包装类TransportWrapper可以查看和控制transport的一些生存状态。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230// ITransportWrapper.h
using namespace std;
using namespace apache::thrift::transport;
class ITransportWrapper
{
public:
typedef std::shared_ptr<TTransport> TTransportPtr;
ITransportWrapper() = default;
virtual ~ITransportWrapper()
{}
virtual bool isBusy() = 0;
virtual void setIsBusy(bool isBusy) = 0;
virtual bool isDead() = 0;
virtual bool isOpen() = 0;
virtual void setIsDead(bool isDead) = 0;
virtual TTransportPtr getTransport() = 0;
virtual void setTransport(TTransportPtr transport) = 0;
/**
* 当前transport是否可用
*
* @return
*/
virtual bool isAvailable() = 0;
virtual time_t getLastUseTime() = 0;
virtual void setLastUseTime(time_t lastUseTime) = 0;
virtual string getHost() = 0;
virtual int getPort() = 0;
virtual string tostring() = 0;
virtual void open() = 0;
virtual void destroy() = 0;
virtual void close() = 0;
};
// TransportWrapper.hpp
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::concurrency;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
class TransportWrapper : public ITransportWrapper
{
public:
typedef std::shared_ptr<TTransport> TTransportPtr;
typedef std::shared_ptr<TSocket> TSocketPtr;
TransportWrapper(string host, int port, bool isOpen = false)
{
TSocketPtr socket(new TSocket(host, port));
TTransportPtr transport(new TFramedTransport (socket));
this->lastUseTime = time(0);
this->transport = transport;
this->host = host;
this->port = port;
if (isOpen)
{
try
{
transport->open();
}
catch (TException &e)
{
std::cout << host << ":" << port << " " << e.what() << std::endl;
isDead_ = true;
}
}
}
TransportWrapper() = default;
~TransportWrapper() = default;
bool isBusy() override
{
return isBusy_;
}
void setIsBusy(bool isBusy_) override
{
this->isBusy_ = isBusy_;
}
bool isDead() override
{
return isDead_;
}
bool isOpen() override
{
return transport->isOpen();
}
void setIsDead(bool isDead_) override
{
this->isDead_ = isDead_;
}
TTransportPtr getTransport() override
{
return transport;
}
void setTransport(TTransportPtr transport) override
{
this->transport = transport;
}
/**
* 当前transport是否可用
*
* @return
*/
bool isAvailable() override
{
return !isBusy_ && !isDead_ && transport->isOpen();
}
time_t getLastUseTime() override
{
return lastUseTime;
}
void setLastUseTime(time_t lastUseTime) override
{
this->lastUseTime = lastUseTime;
}
string getHost() override
{
return host;
}
int getPort() override
{
return port;
}
string tostring() override
{
return host + ":" + to_string(port) + ",isBusy:" + to_string(isBusy_) + ",isDead:" + to_string(isDead_) + ",isOpen:" +
to_string(transport->isOpen()) + ",isAvailable:" + to_string(isAvailable()) + ",lastUseTime:" + ctime(&lastUseTime);
}
void open() override
{
if (!transport->isOpen())
{
try
{
transport->open();
}
catch (TException &e)
{
std::cout << host << ":" << port << " " << e.what() << std::endl;
isDead_ = true;
}
}
}
/**
* 默认关闭连接
*
* @return
*/
void destroy() override
{
this->transport->close();
}
/**
* 默认关闭连接
*
* @return
*/
void close() override
{
this->transport->close();
}
private:
TTransportPtr transport;
/**
* 是否正忙
*/
bool isBusy_ = false;
/**
* 是否已经挂
*/
bool isDead_ = false;
/**
* 最后使用时间
*/
time_t lastUseTime;
/**
* 服务端Server主机名或IP
*/
string host;
/**
* 服务端Port
*/
int port;
};
上面的TransportWrapper类为ITransportWrapper类的普通实现,transport用完后就会释放连接,而对于连接池用到的transport,要求用完后需要放回到连接池中,而不是释放连接,否则tcp就要不停的握手了。所以下面用装饰模式来实现能自动放回连接池的transport包装类,即PoolingTransport。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170// ITransportDecorator.h
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::concurrency;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
class ITransportDecorator : public ITransportWrapper
{
public:
typedef std::shared_ptr<TTransport> TTransportPtr;
ITransportDecorator(std::shared_ptr<ITransportWrapper> _transportWrapper) : transportWrapper(_transportWrapper)
{
}
ITransportDecorator() = default;
~ITransportDecorator()
{}
virtual bool isBusy() override
{
return transportWrapper->isBusy();
}
virtual void setIsBusy(bool isBusy) override
{
transportWrapper->setIsBusy(isBusy);
}
virtual bool isDead() override
{
return transportWrapper->isDead();
}
virtual bool isOpen() override
{
return transportWrapper->isOpen();
}
virtual void setIsDead(bool isDead) override
{
transportWrapper->setIsDead(isDead);
}
virtual TTransportPtr getTransport() override
{
return transportWrapper->getTransport();
}
virtual void setTransport(TTransportPtr transport) override
{
transportWrapper->setTransport(transport);
}
/**
* 当前transport是否可用
*
* @return
*/
virtual bool isAvailable() override
{
return transportWrapper->isAvailable();
}
virtual time_t getLastUseTime() override
{
return transportWrapper->getLastUseTime();
}
virtual void setLastUseTime(time_t lastUseTime) override
{
transportWrapper->setLastUseTime(lastUseTime);
}
virtual string getHost() override
{
return transportWrapper->getHost();
}
virtual int getPort() override
{
return transportWrapper->getPort();
}
virtual string tostring() override
{
return transportWrapper->tostring();
}
virtual void open() override
{
transportWrapper->open();
}
/**
* 默认关闭连接
*
* @return
*/
virtual void destroy() override
{
transportWrapper->destroy();
}
/**
* 默认关闭连接,可被装饰成别的功能,比如放回连接池中
*
* @return
*/
virtual void close() override
{
transportWrapper->close();
}
protected:
std::shared_ptr<ITransportWrapper> transportWrapper;
};
// PoolingTransport.hpp
class PoolingTransport : public ITransportDecorator, public std::enable_shared_from_this<PoolingTransport>
{
public:
PoolingTransport(std::shared_ptr<ITransportWrapper> _transportWrapper, std::shared_ptr<ITransportPool> _transportPool)
: ITransportDecorator(_transportWrapper), transportPool(_transportPool)
{
}
PoolingTransport() = default;
~PoolingTransport() = default;
/**
* 默认关闭连接,装饰成放回连接池中
*
* @return
*/
virtual void close() override
{
std::shared_ptr<ITransportPool> pool = transportPool.lock();
if (pool)
{
transportPool->release(this->shared_from_this());
}
}
private:
std::weak_ptr<ITransportPool> transportPool;
};
至于这个装饰类实现后怎么用,然后是重头戏,连接池类,可以从该连接池中取得transport的包装类,用于通信,用完后自动释放到连接池中,这里就要对普通的类再调用装饰类进行装饰,初始化连接池连接时,就是包装好的能自动放进连接池的transportWrapper。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421// ITransportPool.h
using namespace std;
using namespace apache::thrift::transport;
class ITransportPool
{
public:
typedef std::shared_ptr<ITransportWrapper> ITransportWrapperPtr;
ITransportPool()
{}
virtual ~ITransportPool()
{}
virtual int getCheckInvervalSecond() = 0;
virtual void setCheckInvervalSecond(int checkInvervalSecond) = 0;
/**
* 从池中取一个可用连接
* @return
*/
virtual ITransportWrapperPtr get() = 0;
/**
* 客户端调用完成后,必须手动调用此方法,将TTransport恢复为可用状态
*
* @param client
*/
virtual void release(ITransportWrapperPtr client) = 0;
virtual void destory() = 0;
/**
* 获取当前已经激活的连接数
*
* @return
*/
virtual int getActiveCount() = 0;
/**
* 获取当前繁忙状态的连接数
*
* @return
*/
virtual int getBusyCount() = 0;
/**
* 获取当前已"挂"掉连接数
*
* @return
*/
virtual int getDeadCount() = 0;
virtual string tostring() = 0;
virtual string getWrapperInfo(ITransportWrapperPtr client) = 0;
virtual void init() = 0;
};
// ThriftTransportPool.hpp
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::concurrency;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
class ThriftTransportPool : public ITransportPool, public std::enable_shared_from_this<ThriftTransportPool>
{
public:
typedef std::shared_ptr<ITransportWrapper> ITransportWrapperPtr;
sem_t access;
std::mutex mt;
std::vector<ITransportWrapperPtr> pool;
int poolSize = 1; //连接池大小
int minSize = 1; //池中保持激活状态的最少连接个数
int maxIdleSecond = 300; //最大空闲时间(秒),超过该时间的空闲时间的连接将被关闭
int checkInvervalSecond = 60; //每隔多少秒,检测一次空闲连接(默认60秒)
std::vector<ServerInfo> serverInfos;
bool allowCheck = true;
std::thread checkThread;
/**
* 连接池构造函数,多台服务器
*
* @param poolSize 连接池大小
* @param minSize 池中保持激活的最少连接数
* @param maxIdleSecond 单个连接最大空闲时间,超过此值的连接将被断开
* @param checkInvervalSecond 每隔多少秒检查一次空闲连接
* @param serverList 服务器列表
*/
ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, vector<ServerInfo> &serverList)
{
construct(poolSize, minSize, maxIdleSecond, checkInvervalSecond, serverList);
}
/**
* 连接池构造函数,单台服务器
*
* @param poolSize 连接池大小
* @param minSize 池中保持激活的最少连接数
* @param maxIdleSecond 单个连接最大空闲时间,超过此值的连接将被断开
* @param checkInvervalSecond 每隔多少秒检查一次空闲连接
* @param serverList 服务器列表
*/
ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, ServerInfo &server)
{
std::vector<ServerInfo> serverList;
serverList.push_back(server);
construct(poolSize, minSize, maxIdleSecond, checkInvervalSecond, serverList);
}
/**
* 连接池构造函数(默认最大空闲时间300秒),多台服务器
*
* @param poolSize 连接池大小
* @param minSize 池中保持激活的最少连接数
* @param serverList 服务器列表
*/
ThriftTransportPool(int poolSize, int minSize, vector<ServerInfo> &serverList) : ThriftTransportPool(poolSize, minSize, 300, 60, serverList)
{
}
ThriftTransportPool(int poolSize, vector<ServerInfo> &serverList) : ThriftTransportPool(poolSize, 1, 300, 60, serverList)
{
}
ThriftTransportPool(vector<ServerInfo> &serverList) : ThriftTransportPool(serverList.size(), 1, 300, 60, serverList)
{
}
/**
* 连接池构造函数(默认最大空闲时间300秒),单台服务器
*
* @param poolSize 连接池大小
* @param minSize 池中保持激活的最少连接数
* @param serverList 服务器列表
*/
ThriftTransportPool(int poolSize, int minSize, ServerInfo &server) : ThriftTransportPool(poolSize, minSize, 300, 60, server)
{
}
ThriftTransportPool(int poolSize, ServerInfo &server) : ThriftTransportPool(poolSize, 1, 300, 60, server)
{
}
ThriftTransportPool(ServerInfo &server) : ThriftTransportPool(1, 1, 300, 60, server)
{
}
int getCheckInvervalSecond() override
{
return checkInvervalSecond;
}
void setCheckInvervalSecond(int checkInvervalSecond) override
{
this->checkInvervalSecond = checkInvervalSecond;
}
/**
* 从池中取一个可用连接
* @return
*/
std::shared_ptr<ITransportWrapper> get() override
{
if (sem_trywait(&access) == 0)
{
std::lock_guard<std::mutex> lock(mt);
for (int i = 0; i < pool.size(); i++)
{
if (pool[i]->isAvailable())
{
pool[i]->setIsBusy(true);
pool[i]->setLastUseTime(time(0));
return pool[i];
}
}
//尝试激活更多连接
for (int i = 0; i < pool.size(); i++)
{
if (!pool[i]->isBusy() && !pool[i]->isDead() && !pool[i]->isOpen())
{
pool[i]->open();
pool[i]->setIsBusy(true);
pool[i]->setLastUseTime(time(0));
return pool[i];
}
}
}
throw std::runtime_error("can not get available client");
}
/**
* 客户端调用完成后,必须手动调用此方法,将TTransport恢复为可用状态
*
* @param client
*/
void release(ITransportWrapperPtr client) override
{
bool released = false;
{
std::lock_guard<std::mutex> lock(mt);
for (int i = 0; i < pool.size(); i++)
{
if (client == pool[i] && pool[i]->isBusy())
{
pool[i]->setIsBusy(false);
released = true;
break;
}
}
}
if (released)
{
sem_post(&access);
}
}
void destory() override
{
for (int i = 0; i < pool.size(); i++)
{
pool[i]->destroy();
}
allowCheck = false;
checkThread.detach();
std::cout << "连接池被销毁!" << std::endl;
}
/**
* 获取当前已经激活的连接数
*
* @return
*/
int getActiveCount() override
{
int result = 0;
for (int i = 0; i < pool.size(); i++)
{
if (!pool[i]->isDead() && pool[i]->isOpen())
{
result += 1;
}
}
return result;
}
/**
* 获取当前繁忙状态的连接数
*
* @return
*/
int getBusyCount() override
{
int result = 0;
for (int i = 0; i < pool.size(); i++)
{
if (!pool[i]->isDead() && pool[i]->isBusy())
{
result += 1;
}
}
return result;
}
/**
* 获取当前已"挂"掉连接数
*
* @return
*/
int getDeadCount() override
{
int result = 0;
for (int i = 0; i < pool.size(); i++)
{
if (pool[i]->isDead())
{
result += 1;
}
}
return result;
}
string tostring() override
{
return "poolsize:" + to_string(pool.size()) +
",minSize:" + to_string(minSize) +
",maxIdleSecond:" + to_string(maxIdleSecond) +
",checkInvervalSecond:" + to_string(checkInvervalSecond) +
",active:" + to_string(getActiveCount()) +
",busy:" + to_string(getBusyCount()) +
",dead:" + to_string(getDeadCount());
}
string getWrapperInfo(ITransportWrapperPtr client) override
{
for (int i = 0; i < pool.size(); i++)
{
if (pool[i] == client)
{
return pool[i]->tostring();
}
}
return "";
}
/**
* 连接池初始化
*/
void init()
{
if (sem_init(&access, 0, poolSize))
{
printf("Semaphore initialization failed!!\n");
exit(-1);
}
pool.resize(poolSize);
for (int i = 0; i < pool.size(); i++)
{
int j = i % serverInfos.size();
std::shared_ptr<ITransportPool> transportPool = this->shared_from_this();
if (i < minSize)
{
// 使用可自动回收到连接池的transport
std::shared_ptr<ITransportWrapper> transportWrapper = std::make_shared<TransportWrapper>(serverInfos[j].getHost(), serverInfos[j].getPort(), true);
pool[i] = std::make_shared<PoolingTransport>(transportWrapper, transportPool);
}
else
{
// 使用可自动回收到连接池的transport
std::shared_ptr<ITransportWrapper> transportWrapper = std::make_shared<TransportWrapper>(serverInfos[j].getHost(), serverInfos[j].getPort());
pool[i] = std::make_shared<PoolingTransport>(transportWrapper, transportPool);
}
}
check();
}
private:
void construct(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, vector<ServerInfo> &serverList)
{
if (poolSize <= 0)
{
poolSize = 1;
}
if (minSize > poolSize)
{
minSize = poolSize;
}
if (minSize <= 0)
{
minSize = 0;
}
this->maxIdleSecond = maxIdleSecond;
this->minSize = minSize;
this->poolSize = poolSize;
this->serverInfos = serverList;
this->allowCheck = true;
this->checkInvervalSecond = checkInvervalSecond;
}
/**
* 检查空闲连接
*/
void check()
{
checkThread = std::thread([&]()
{
while (allowCheck)
{
std::cout << "--------------" << std::endl;
std::cout << "开始检测空闲连接..." << std::endl;
for (int i = 0; i < pool.size(); i++)
{
if (pool[i] == nullptr)
{
std::cout << "pool[" << i << "]为null" << std::endl;
}
// std::cout << pool[i]->tostring() << std::endl;
if (pool[i]->isAvailable())
{
long idleTime = time(0) - pool[i]->getLastUseTime();
std::cout << "available pool[" << i << "] idleTime: " << idleTime << std::endl;
//超过空闲阀值的连接,主动断开,以减少资源消耗
if (idleTime > maxIdleSecond)
{
if (getActiveCount() > minSize)
{
pool[i]->destroy();
pool[i]->setIsBusy(false);
std::cout << pool[i]->getHost() << ":" << to_string(pool[i]->getPort()) << " 超过空闲时间阀值被断开!" << std::endl;
}
}
}
}
std::cout << "当前活动连接数:" << getActiveCount() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(checkInvervalSecond * 1000));
}
});
}
};
简直太神奇了,以后再也不用担心连接爆了,而且对于不怎么活跃的连接,经过一段时间后,会自动释放掉,连接池也不会经常保持那么多连接,毕竟不用也占有资源。这个框架弄好以后,用起来就非常方便了,这就是一个打地基的过程,后面建起来高楼大厦就更稳更快了,感觉还是要懂设计模式,才能写出大工程的代码,否则新需求来了,维护起来非常痛苦。