对于一个项目,我开始了解gRPC的工作原理。为此,我实现了以下设置:
使用同步API的C ++服务器提供两种服务RegisterCommand
(流式传输)和NewCommandMsg
(阻塞式)。
这是.proto定义:
service Command {
rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {}
rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {}
}
我要达到什么目的?
多个客户端应调用RegisterCommand
,服务器应阻塞该过程,直到发生对NewCommandMsg
的调用为止(我保证一次只发生一个调用)。如果调用NewCommandMsg
,则应将参数CommandMessage
传送到RegisterCommand
的每个线程(我知道每个调用都在一个线程中处理),该线程应被解除阻塞,并且{{1 }}应写入流中。之后,CommandMessage
的线程将再次被阻塞,并等待下一次对RegisterCommand
的调用。
稍后,NewCommandMsg
将被一个非grpc线程替换。
我已经做了什么
我了解了很多有关C ++中(共享)的期货,promise,互斥量和条件变量的信息,并实现了以下代码。
NewCommandMsg
该代码会发生什么
在对class CommandServiceImpl final : public Command::Service {
//To my understanding these are common for all threads
std::promise<CommandMessage> newCommandPromise;
std::shared_future<CommandMessage> newCommandFuture = this->newCommandPromise.get_future();
//To my understanding this is executed in an own thread
Status RegisterCommand(ServerContext* context,const CommandRequest* request,ServerWriter<CommandMessage>* writer) override {
//Each thread gets its own copy of the shared future
std::shared_future<CommandMessage> future = this->newCommandFuture;
while(!context->IsCancelled()){
future.wait();
(void)future.get();
std::cout << "distributing command" << std::endl;
//actual writing would happen here
}
return Status::CANCELLED;
}
//To my understanding this is executed in an own thread
Status NewCommandMsg(ServerContext* context,const CommandMessage* request,google::protobuf::Empty* response) override {
std::promise<CommandMessage> promise = move(this->newCommandPromise);
std::cout << "new command received" << std::endl;
promise.set_value(*request);
//Provide new promise,for next call
//In my evaluation phase,I guarantee,that only one client at a time will call NewCommandMsg
std::promise<CommandMessage> cleanPromise;
this->newCommandPromise = move(cleanPromise);
return Status::OK;
}
};
进行一次或多次并发调用之后,服务器将阻塞,而在对RegisterCommand
进行调用之后,NewCommandMessage
将会解除阻塞。之后,future.wait()
当然总是非阻塞的,因此线程以无限循环运行。但是它可能只运行一次,然后等待新数据可用。
似乎不可能“重用”现有的未来。 关于如何实现我的目标的任何想法?