
连接池使用
需要先构造连接池对象,并完成初始化,连接池可以连接多个rpc服务器,从连接池取连接时会随机取不同服务器的连接,达到负载均衡的效果。
下图中为连接池里面使用的装饰者模式uml类图,从连接池中拿出的ITransportWrapper连接被PoolingTransport装饰后,close功能是自动放回连接池中,而不是真正的close掉连接,同样的,FakeTransport装饰后,IsOpen总是返回true,因为它是一个假冒的连接

连接池连接状态变更图如下,首先打开一个连接,打开成功进入BUSY状态,失败则进入DEAD状态,如果BUSY状态的连接使用过程中出现异常,也会进入DEAD状态。
处于BUSY状态的连接正常运行完成后会被重新放回连接池中,状态变更为IDLE状态,后续可以从连接池中获取IDLE状态的连接来处理新的事务。
对于DEAD状态的连接,需要判断当前活跃连接个数,如果活跃连接已经大于minSize了,直接销毁该连接,变更为CLOSED状态。否则重新打开该连接,让其变更为IDLE状态以备使用,当然如果打开失败,该连接还是处于DEAD状态。
对于IDLE状态的连接,如果长时间没有数据发送了,即idleTime > maxIdleSecond,如果活跃连接已经大于minSize了,则将该连接关闭,转为CLOSED状态,减少资源的占用,如果活跃连接小于等于minSize,则考虑将该连接推至栈顶,下次就会调用该连接发送数据了,预防长时间没有数据传输导致连接异常的情况。

1.初始化
如果有多个rpc服务器,请使用std::vector,放入多个服务器的信息。
1 2 3
| ServerInfo serverInfo("127.0.0.1", 9090); std::shared_ptr<ThriftTransportPool> transportPool = make_shared<ThriftTransportPool>(5, serverInfo); transportPool->init();
|
2.客户端建立
需要将从连接池中获取的连接放入thriftClientProxy中进行代理,后续使用getClient方法进行操作即可。
1 2
| thriftClientProxy<RpcServiceClient> clientProxy(transportPool->get()); clientProxy.getClient()->send_createPreloadTask(param);
|
上面的方法每次都需要在不同的地方实例化对象,如下,改为工厂模式统一实例,方便管理。另外,直接调用getClient()操作可能会有异常抛出,故对clientProxy里面的方法进行包装,异常内部消化掉了。
1 2 3
| auto clientProxy = ClientProxyFactory<RpcServiceClient>::instance().createThriftClientProxy(); clientProxy->wrapCallForVoid(&RpcServiceClient::createPreloadTask, ack, param);
|
相关的uml类图如下:

3.重试机制
对于从连接池取出来的连接进行数据通信时,如果出现异常(异常连接会变更为DEAD状态),可以进行重试,即重新从连接池取另外的正常连接再次进行rpc调用,重试的次数可设置。

非连接池客户端
对于单次的rpc请求,无需建立连接池,直接使用ThriftClientProxy,用完自动释放连接。
1 2 3 4 5 6
| TransportWrapper::Option opt; opt.host = UidToIpPort::instance().getIp(m_Para.uid); opt.port = UidToIpPort::instance().getPort(m_Para.uid); opt.recvTimeOut = 10000; auto clientProxy = ClientProxyFactory<SubscriberServiceClient>::instance().createThriftClientProxy(opt); clientProxy->wrapCallForVoid(&SubscriberServiceClient::sendPreloadStatus, param);
|
rpc广播
如果需要将消息发送给多个节点,需要使用订阅者发布者模式,该模式的uml类图如下,具体PublisherSubject、ClientObserver均依赖于抽象。

1 2 3 4 5 6 7 8 9 10 11 12
| auto clientProxy = ClientProxyFactory<RpcServiceClient>::instance().createThriftClientProxy(); clientProxy->wrapCallForVoid(&RpcServiceClient::subscribeShareInfo, ack, param); void RpcServiceHandler::subscribeShareInfo(SubscribeShareInfoAck &_return, const SubscribeShareInfoParam ¶m) { getConnectToSubscriber<>(__FUNCTION__, _return, param, [&]() { std::shared_ptr<IObserver<NotityShareTaskParam>> clientObserver = std::make_shared<ClientObserver<NotityShareTaskParam>>(g_shareInfoSubject, param.uid); g_shareInfoSubject->registerObserver(clientObserver); }); } g_shareInfoSubject->setData(param);
|
rpc异步回调
前端发送异步请求,请求中携带前端中回调服务器的uid,ip和port,后端处理该请求需要很长时间,比如预下载请求,此时前端不会阻塞等待后端处理完该请求,前端可以去处理别的事情。 当后端处理完该请求后,会发起另外一次rpc请求到客户端的回调服务器,然后由前端的回调服务器处理该次异步请求的结果,此时的前端充当callback服务器的角色。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| auto clientProxy = ClientProxyFactory<SecondServiceClient>::instance().createSecondThriftClientProxy(); clientProxy->wrapCallForVoid(&SecondServiceClient::createPreloadTask, ack, param); void SecondServiceHandler::createPreloadTask(CreatePreloadTaskAck& _return, const CreatePreloadTaskParam& param) { getConnectToSubscriber<>(__FUNCTION__, _return, param, [&]() { std::cout << "get preload task, uid: " << param.uid << " url: " << param.url << std::endl; HASH hash(param.hash); SPreloadTaskPara para; std::string url = param.url; std::string uid = param.uid; para.version = param.para.version; para.iCacheTime = param.para.iCacheTime; para.iDownloadTime = param.para.iDownloadTime; for (auto p : param.para.periodVec) { para.periodVec.emplace_back(SPeriod(p.begin_, p.end_)); } para.scdnTaskType = param.para.scdnTaskType; para.strKeyhash = param.strKeyhash; para.strOriginDomain = param.strOriginDomain; SCreatePreloadTaskOutResult preloadResult; _return.error_code = CServerProduct::CreatePreloadTask(hash, url, para, preloadResult, uid); _return.__set_preloadResultForStatis(preloadResult.preloadResultForStatis); _return.__set_cdnStatusCode(preloadResult.cdnStatusCode); _return.__set_fileSize(preloadResult.fileSize); std::cout << "preload completed..." << std::endl; }); } TransportWrapper::Option opt; opt.host = UidToIpPort::instance().getIp(m_Para.uid); opt.port = UidToIpPort::instance().getPort(m_Para.uid); opt.recvTimeOut = 10000; auto clientProxy = ClientProxyFactory<SubscriberServiceClient>::instance().createThriftClientProxy(opt); clientProxy->wrapCallForVoid(&SubscriberServiceClient::sendPreloadStatus, param); void SubscriberServiceHandler::sendPreloadStatus(const SendPreloadStatusParam& param) { std::cout << "SubscriberServiceHandler::sendPreloadStatus: " << param.url << std::endl; SJsonMinerPreloadStatus* ps = new SJsonMinerPreloadStatus(); ps->ts = GetUnixTime(); ps->domain = param.domain; ps->url = param.url; ps->hash = param.hash; ps->downBytes = param.downBytes; ps->downBytesFromP2p = param.downBytesFromP2p; ps->downTime = param.downTime; ps->downSpeed = param.downSpeed; ps->status = param.status; ps->result = param.result; ps->isP2p = param.isP2p; ps->cacheSize = param.cacheSize; ps->mediaDuration = param.mediaDuration; ps->mediaRate = param.mediaRate; ps->fileSize = param.fileSize; if (!CStatisV2::Instance()->AddJsonLog(ps)) { ps->release(); } }
|