rpc框架使用

连接池使用

需要先构造连接池对象,并完成初始化,连接池可以连接多个rpc服务器,从连接池取连接时会随机取不同服务器的连接,达到负载均衡的效果。

下图中为连接池里面使用的装饰者模式uml类图,从连接池中拿出的ITransportWrapper连接被PoolingTransport装饰后,close功能是自动放回连接池中,而不是真正的close掉连接,同样的,FakeTransport装饰后,IsOpen总是返回true,因为它是一个假冒的连接

连接池连接状态变更图如下,首先打开一个连接,打开成功进入BUSY状态,失败则进入DEAD状态,如果BUSY状态的连接使用过程中出现异常,也会进入DEAD状态。

处于BUSY状态的连接正常运行完成后会被重新放回连接池中,状态变更为IDLE状态,后续可以从连接池中获取IDLE状态的连接来处理新的事务。

对于DEAD状态的连接,需要判断当前活跃连接个数,如果活跃连接已经大于minSize了,直接销毁该连接,变更为CLOSED状态。否则重新打开该连接,让其变更为IDLE状态以备使用,当然如果打开失败,该连接还是处于DEAD状态。

对于IDLE状态的连接,如果长时间没有数据发送了,即idleTime > maxIdleSecond,如果活跃连接已经大于minSize了,则将该连接关闭,转为CLOSED状态,减少资源的占用,如果活跃连接小于等于minSize,则考虑将该连接推至栈顶,下次就会调用该连接发送数据了,预防长时间没有数据传输导致连接异常的情况。

1.初始化

如果有多个rpc服务器,请使用std::vector,放入多个服务器的信息。

1
2
3
ServerInfo serverInfo("127.0.0.1", 9090); // rpc服务器地址和端口
std::shared_ptr<ThriftTransportPool> transportPool = make_shared<ThriftTransportPool>(5, serverInfo); // 5为连接池大小
transportPool->init();

2.客户端建立

需要将从连接池中获取的连接放入thriftClientProxy中进行代理,后续使用getClient方法进行操作即可。

1
2
thriftClientProxy<RpcServiceClient> clientProxy(transportPool->get()); // 生命周期外自动释放连接到连接池中
clientProxy.getClient()->send_createPreloadTask(param);

上面的方法每次都需要在不同的地方实例化对象,如下,改为工厂模式统一实例,方便管理。另外,直接调用getClient()操作可能会有异常抛出,故对clientProxy里面的方法进行包装,异常内部消化掉了。

1
2
3
// 更加方便的方法
auto clientProxy = ClientProxyFactory<RpcServiceClient>::instance().createThriftClientProxy();
clientProxy->wrapCallForVoid(&RpcServiceClient::createPreloadTask, ack, param);

相关的uml类图如下:

3.重试机制

对于从连接池取出来的连接进行数据通信时,如果出现异常(异常连接会变更为DEAD状态),可以进行重试,即重新从连接池取另外的正常连接再次进行rpc调用,重试的次数可设置。

非连接池客户端

对于单次的rpc请求,无需建立连接池,直接使用ThriftClientProxy,用完自动释放连接。

1
2
3
4
5
6
TransportWrapper::Option opt;
opt.host = UidToIpPort::instance().getIp(m_Para.uid); // 需要指定ip
opt.port = UidToIpPort::instance().getPort(m_Para.uid);
opt.recvTimeOut = 10000;
auto clientProxy = ClientProxyFactory<SubscriberServiceClient>::instance().createThriftClientProxy(opt);
clientProxy->wrapCallForVoid(&SubscriberServiceClient::sendPreloadStatus, param);

rpc广播

如果需要将消息发送给多个节点,需要使用订阅者发布者模式,该模式的uml类图如下,具体PublisherSubject、ClientObserver均依赖于抽象。

1
2
3
4
5
6
7
8
9
10
11
12
auto clientProxy = ClientProxyFactory<RpcServiceClient>::instance().createThriftClientProxy();
clientProxy->wrapCallForVoid(&RpcServiceClient::subscribeShareInfo, ack, param); // 订阅者首先需要发起订阅需求

void RpcServiceHandler::subscribeShareInfo(SubscribeShareInfoAck &_return, const SubscribeShareInfoParam &param)
{
getConnectToSubscriber<>(__FUNCTION__, _return, param, [&]() {
std::shared_ptr<IObserver<NotityShareTaskParam>> clientObserver = std::make_shared<ClientObserver<NotityShareTaskParam>>(g_shareInfoSubject, param.uid);
g_shareInfoSubject->registerObserver(clientObserver); // rpc服务接收到订阅请求后,将其注册到相对应的主题下
});
}

g_shareInfoSubject->setData(param); // 数据有更新,直接更新主题即可,它会自动推送给所有的订阅者

rpc异步回调

前端发送异步请求,请求中携带前端中回调服务器的uid,ip和port,后端处理该请求需要很长时间,比如预下载请求,此时前端不会阻塞等待后端处理完该请求,前端可以去处理别的事情。 当后端处理完该请求后,会发起另外一次rpc请求到客户端的回调服务器,然后由前端的回调服务器处理该次异步请求的结果,此时的前端充当callback服务器的角色。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
auto clientProxy = ClientProxyFactory<SecondServiceClient>::instance().createSecondThriftClientProxy();
clientProxy->wrapCallForVoid(&SecondServiceClient::createPreloadTask, ack, param); // 客户端发送异步请求,无需阻塞等待

void SecondServiceHandler::createPreloadTask(CreatePreloadTaskAck& _return, const CreatePreloadTaskParam& param)
{
getConnectToSubscriber<>(__FUNCTION__, _return, param, [&]() { // 解析uid、ip和port信息,存入UidToIpPort::instance()中
std::cout << "get preload task, uid: " << param.uid << " url: " << param.url << std::endl;
HASH hash(param.hash);
SPreloadTaskPara para;
std::string url = param.url;
std::string uid = param.uid;
para.version = param.para.version;
para.iCacheTime = param.para.iCacheTime;
para.iDownloadTime = param.para.iDownloadTime;
for (auto p : param.para.periodVec)
{
para.periodVec.emplace_back(SPeriod(p.begin_, p.end_));
}
para.scdnTaskType = param.para.scdnTaskType;
para.strKeyhash = param.strKeyhash;
para.strOriginDomain = param.strOriginDomain;
SCreatePreloadTaskOutResult preloadResult;
// 创建预下载任务
_return.error_code = CServerProduct::CreatePreloadTask(hash, url, para, preloadResult, uid);
_return.__set_preloadResultForStatis(preloadResult.preloadResultForStatis);
_return.__set_cdnStatusCode(preloadResult.cdnStatusCode);
_return.__set_fileSize(preloadResult.fileSize);
std::cout << "preload completed..." << std::endl;
});
}

TransportWrapper::Option opt;
opt.host = UidToIpPort::instance().getIp(m_Para.uid); // 从UidToIpPort::instance()中取ip、port信息
opt.port = UidToIpPort::instance().getPort(m_Para.uid);
opt.recvTimeOut = 10000;
auto clientProxy = ClientProxyFactory<SubscriberServiceClient>::instance().createThriftClientProxy(opt);
clientProxy->wrapCallForVoid(&SubscriberServiceClient::sendPreloadStatus, param); // 处理完成后,发起一次rpc请求到客户端的回调服务器

void SubscriberServiceHandler::sendPreloadStatus(const SendPreloadStatusParam& param) // 回调服务器处理该次异步请求的结果
{
std::cout << "SubscriberServiceHandler::sendPreloadStatus: " << param.url << std::endl;
SJsonMinerPreloadStatus* ps = new SJsonMinerPreloadStatus();
ps->ts = GetUnixTime();
ps->domain = param.domain;
ps->url = param.url;
ps->hash = param.hash;
ps->downBytes = param.downBytes;
ps->downBytesFromP2p = param.downBytesFromP2p;
ps->downTime = param.downTime;
ps->downSpeed = param.downSpeed;
ps->status = param.status;
ps->result = param.result;
ps->isP2p = param.isP2p;
ps->cacheSize = param.cacheSize;
ps->mediaDuration = param.mediaDuration;
ps->mediaRate = param.mediaRate;
ps->fileSize = param.fileSize;
if (!CStatisV2::Instance()->AddJsonLog(ps))
{
ps->release();
}
}
nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!