使用shared_futures同步gRPC线程/调用,并在C ++中的线程之间共享数据

对于一个项目,我开始了解gRPC的工作原理。为此,我实现了以下设置:

使用同步API的C ++服务器提供两种服务RegisterCommand(流式传输)和NewCommandMsg(阻塞式)。 这是.proto定义:

service Command {
    rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {}
    rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {}
}

我要达到什么目的?

多个客户端应调用RegisterCommand,服务器应阻塞该过程,直到发生对NewCommandMsg的调用为止(我保证一次只发生一个调用)。如果调用NewCommandMsg,则应将参数CommandMessage传送到RegisterCommand的每个线程(我知道每个调用都在一个线程中处理),该线程应被解除阻塞,并且{{1 }}应写入流中。之后,CommandMessage的线程将再次被阻塞,并等待下一次对RegisterCommand的调用。 稍后,NewCommandMsg将被一个非grpc线程替换。

我已经做了什么

我了解了很多有关C ++中(共享)的期货,promise,互斥量和条件变量的信息,并实现了以下代码。

NewCommandMsg

该代码会发生什么

在对class CommandServiceImpl final : public Command::Service { //To my understanding these are common for all threads std::promise<CommandMessage> newCommandPromise; std::shared_future<CommandMessage> newCommandFuture = this->newCommandPromise.get_future(); //To my understanding this is executed in an own thread Status RegisterCommand(ServerContext* context,const CommandRequest* request,ServerWriter<CommandMessage>* writer) override { //Each thread gets its own copy of the shared future std::shared_future<CommandMessage> future = this->newCommandFuture; while(!context->IsCancelled()){ future.wait(); (void)future.get(); std::cout << "distributing command" << std::endl; //actual writing would happen here } return Status::CANCELLED; } //To my understanding this is executed in an own thread Status NewCommandMsg(ServerContext* context,const CommandMessage* request,google::protobuf::Empty* response) override { std::promise<CommandMessage> promise = move(this->newCommandPromise); std::cout << "new command received" << std::endl; promise.set_value(*request); //Provide new promise,for next call //In my evaluation phase,I guarantee,that only one client at a time will call NewCommandMsg std::promise<CommandMessage> cleanPromise; this->newCommandPromise = move(cleanPromise); return Status::OK; } }; 进行一次或多次并发调用之后,服务器将阻塞,而在对RegisterCommand进行调用之后,NewCommandMessage将会解除阻塞。之后,future.wait()当然总是非阻塞的,因此线程以无限循环运行。但是它可能只运行一次,然后等待新数据可用。

似乎不可能“重用”现有的未来。 关于如何实现我的目标的任何想法?

mercysin 回答:使用shared_futures同步gRPC线程/调用,并在C ++中的线程之间共享数据

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2364592.html

大家都在问