This repository was archived by the owner on Jan 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
This repository was archived by the owner on Jan 9, 2024. It is now read-only.
Consider updating asio-grpc #439
Copy link
Copy link
Open
Description
I generally try my best to regularly update asio-grpc upstream: cpp-pm/hunter#686
With the latest changes in version 2.6.0, we could greatly simplify some implementation. For example ServerStreamingRpc:
- No more dangerous thread_local error_code handling
- No more Dispatcher needed
- No more read_failed member needed, decision is now made at compile time
namespace detail
{
class GrpcStatusException final : public std::exception
{
public:
// Consider including status.error_code in the error message
explicit GrpcStatusException(const grpc::Status& status) : error_message(status.error_message()) {}
const char* what() const final { return error_message.c_str(); }
private:
std::string error_message;
};
std::exception_ptr to_exception_ptr(const grpc::Status& status)
{
return std::make_exception_ptr(GrpcStatusException{status});
}
std::exception_ptr to_exception_ptr_if_not_ok(const grpc::Status& status)
{
if (status.ok())
{
return {};
}
return detail::to_exception_ptr(status);
}
} // namespace detail
template <auto PrepareAsync>
class ServerStreamingRpc : public agrpc::ClientRPC<PrepareAsync>
{
private:
using Base = agrpc::ClientRPC<PrepareAsync>;
using Stub = typename Base::Stub;
using Request = typename Base::Request;
using Reply = typename Base::Response;
struct Start
{
ServerStreamingRpc& self_;
Stub& stub_;
const Request& request_;
template <typename Op>
void operator()(Op& op)
{
self_.Base::start(stub_, request_, std::move(op));
}
template <typename Op>
void operator()(Op& op, bool ok)
{
if (ok)
{
op.complete({});
}
else
{
self_.Base::finish(std::move(op));
}
}
template <typename Op>
void operator()(Op& op, grpc::Status status)
{
op.complete(detail::to_exception_ptr_if_not_ok(status));
}
};
struct Read
{
ServerStreamingRpc& self_;
template <typename Op>
void operator()(Op& op)
{
// This technically leads to use-after-move but protobuf message types are in a well-defined state after
// move (as if default constructed).
self_.Base::read(self_.reply_, std::move(op));
}
template <typename Op>
void operator()(Op& op, bool ok)
{
if (ok)
{
op.complete({}, std::move(self_.reply_));
}
else
{
self_.Base::finish(std::move(op));
}
}
template <typename Op>
void operator()(Op& op, grpc::Status status)
{
// We should consider returning the reply by optional reference to improve memory reuse and make it easier
// for the caller to handle OK termination of the stream by the server.
if (status.ok())
{
status = grpc::Status{grpc::StatusCode::ABORTED, "operation closed by server"};
}
op.complete(detail::to_exception_ptr(status), {});
}
};
public:
explicit ServerStreamingRpc(agrpc::GrpcContext& grpc_context) : Base(grpc_context) {}
template <typename CompletionToken = asio::use_awaitable_t<>>
auto start(Stub& stub, const Request& request, CompletionToken&& token = {})
{
return boost::asio::async_compose<CompletionToken, void(std::exception_ptr)>(Start{*this, stub, request},
token);
}
template <typename CompletionToken = asio::use_awaitable_t<>>
auto read(CompletionToken&& token = {})
{
return boost::asio::async_compose<CompletionToken, void(std::exception_ptr, Reply)>(Read{*this}, token);
}
// request_on and read_on are no longer needed, because agrpc::ClientRPC behaves more like a typical asio IoObject, which
// means it will post to the completion handler's executor before completion. In the case of asio::awaitable the would be the
// one passed to co_spawn. But the old behavior can be obtained by passing
// `asio::bind_executor(grpc_context, asio::use_awaitable)
// or even `asio::bind_executor(asio::system_executor{}, asio::use_awaitable)
// as completion token
private:
using Base::finish;
using Base::read_initial_metadata;
Reply reply_;
};Metadata
Metadata
Assignees
Labels
No labels