连接池使用
需要先构造连接池对象,并完成初始化,连接池可以连接多个rpc服务器,从连接池取连接时会随机取不同服务器的连接,达到负载均衡的效果。
下图中为连接池里面使用的装饰者模式uml类图,从连接池中拿出的ITransportWrapper连接被PoolingTransport装饰后,close功能是自动放回连接池中,而不是真正的close掉连接,同样的,FakeTransport装饰后,IsOpen总是返回true,因为它是一个假冒的连接
连接池连接状态变更图如下,首先打开一个连接,打开成功进入BUSY状态,失败则进入DEAD状态,如果BUSY状态的连接使用过程中出现异常,也会进入DEAD状态。
处于BUSY状态的连接正常运行完成后会被重新放回连接池中,状态变更为IDLE状态,后续可以从连接池中获取IDLE状态的连接来处理新的事务。
对于DEAD状态的连接,需要判断当前活跃连接个数,如果活跃连接已经大于minSize了,直接销毁该连接,变更为CLOSED状态。否则重新打开该连接,让其变更为IDLE状态以备使用,当然如果打开失败,该连接还是处于DEAD状态。
对于IDLE状态的连接,如果长时间没有数据发送了,即idleTime > maxIdleSecond,如果活跃连接已经大于minSize了,则将该连接关闭,转为CLOSED状态,减少资源的占用,如果活跃连接小于等于minSize,则考虑将该连接推至栈顶,下次就会调用该连接发送数据了,预防长时间没有数据传输导致连接异常的情况。
1.初始化
如果有多个rpc服务器,请使用std::vector1
2
3ServerInfo serverInfo("127.0.0.1", 9090); // rpc服务器地址和端口
std::shared_ptr<ThriftTransportPool> transportPool = make_shared<ThriftTransportPool>(5, serverInfo); // 5为连接池大小
transportPool->init();
2.客户端建立
需要将从连接池中获取的连接放入thriftClientProxy中进行代理,后续使用getClient方法进行操作即可。1
2thriftClientProxy<RpcServiceClient> clientProxy(transportPool->get()); // 生命周期外自动释放连接到连接池中
clientProxy.getClient()->send_createPreloadTask(param);
上面的方法每次都需要在不同的地方实例化对象,如下,改为工厂模式统一实例,方便管理。另外,直接调用getClient()操作可能会有异常抛出,故对clientProxy里面的方法进行包装,异常内部消化掉了。
1 | // 更加方便的方法 |
相关的uml类图如下:
3.重试机制
对于从连接池取出来的连接进行数据通信时,如果出现异常(异常连接会变更为DEAD状态),可以进行重试,即重新从连接池取另外的正常连接再次进行rpc调用,重试的次数可设置。
非连接池客户端
对于单次的rpc请求,无需建立连接池,直接使用ThriftClientProxy,用完自动释放连接。
1 | TransportWrapper::Option opt; |
rpc广播
如果需要将消息发送给多个节点,需要使用订阅者发布者模式,该模式的uml类图如下,具体PublisherSubject、ClientObserver均依赖于抽象。
1 | auto clientProxy = ClientProxyFactory<RpcServiceClient>::instance().createThriftClientProxy(); |
rpc异步回调
前端发送异步请求,请求中携带前端中回调服务器的uid,ip和port,后端处理该请求需要很长时间,比如预下载请求,此时前端不会阻塞等待后端处理完该请求,前端可以去处理别的事情。 当后端处理完该请求后,会发起另外一次rpc请求到客户端的回调服务器,然后由前端的回调服务器处理该次异步请求的结果,此时的前端充当callback服务器的角色。
1 | auto clientProxy = ClientProxyFactory<SecondServiceClient>::instance().createSecondThriftClientProxy(); |