Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Consider updating asio-grpc #439

@Tradias

Description

@Tradias

I generally try my best to regularly update asio-grpc upstream: cpp-pm/hunter#686

With the latest changes in version 2.6.0, we could greatly simplify some implementation. For example ServerStreamingRpc:

  • No more dangerous thread_local error_code handling
  • No more Dispatcher needed
  • No more read_failed member needed, decision is now made at compile time
namespace detail
{
class GrpcStatusException final : public std::exception
{
  public:
    // Consider including status.error_code in the error message
    explicit GrpcStatusException(const grpc::Status& status) : error_message(status.error_message()) {}

    const char* what() const final { return error_message.c_str(); }

  private:
    std::string error_message;
};

std::exception_ptr to_exception_ptr(const grpc::Status& status)
{
    return std::make_exception_ptr(GrpcStatusException{status});
}

std::exception_ptr to_exception_ptr_if_not_ok(const grpc::Status& status)
{
    if (status.ok())
    {
        return {};
    }
    return detail::to_exception_ptr(status);
}
}  // namespace detail


template <auto PrepareAsync>
class ServerStreamingRpc : public agrpc::ClientRPC<PrepareAsync>
{
  private:
    using Base = agrpc::ClientRPC<PrepareAsync>;
    using Stub = typename Base::Stub;
    using Request = typename Base::Request;
    using Reply = typename Base::Response;

    struct Start
    {
        ServerStreamingRpc& self_;
        Stub& stub_;
        const Request& request_;

        template <typename Op>
        void operator()(Op& op)
        {
            self_.Base::start(stub_, request_, std::move(op));
        }

        template <typename Op>
        void operator()(Op& op, bool ok)
        {
            if (ok)
            {
                op.complete({});
            }
            else
            {
                self_.Base::finish(std::move(op));
            }
        }

        template <typename Op>
        void operator()(Op& op, grpc::Status status)
        {
            op.complete(detail::to_exception_ptr_if_not_ok(status));
        }
    };

    struct Read
    {
        ServerStreamingRpc& self_;

        template <typename Op>
        void operator()(Op& op)
        {
            // This technically leads to use-after-move but protobuf message types are in a well-defined state after
            // move (as if default constructed).
            self_.Base::read(self_.reply_, std::move(op));
        }

        template <typename Op>
        void operator()(Op& op, bool ok)
        {
            if (ok)
            {
                op.complete({}, std::move(self_.reply_));
            }
            else
            {
                self_.Base::finish(std::move(op));
            }
        }

        template <typename Op>
        void operator()(Op& op, grpc::Status status)
        {
            // We should consider returning the reply by optional reference to improve memory reuse and make it easier
            // for the caller to handle OK termination of the stream by the server.
            if (status.ok())
            {
                status = grpc::Status{grpc::StatusCode::ABORTED, "operation closed by server"};
            }
            op.complete(detail::to_exception_ptr(status), {});
        }
    };

  public:
    explicit ServerStreamingRpc(agrpc::GrpcContext& grpc_context) : Base(grpc_context) {}

    template <typename CompletionToken = asio::use_awaitable_t<>>
    auto start(Stub& stub, const Request& request, CompletionToken&& token = {})
    {
        return boost::asio::async_compose<CompletionToken, void(std::exception_ptr)>(Start{*this, stub, request},
                                                                                     token);
    }

    template <typename CompletionToken = asio::use_awaitable_t<>>
    auto read(CompletionToken&& token = {})
    {
        return boost::asio::async_compose<CompletionToken, void(std::exception_ptr, Reply)>(Read{*this}, token);
    }

   // request_on and read_on are no longer needed, because agrpc::ClientRPC behaves more like a typical asio IoObject, which
   // means it will post to the completion handler's executor before completion. In the case of asio::awaitable the would be the 
   // one passed to co_spawn. But the old behavior can be obtained by passing
   // `asio::bind_executor(grpc_context, asio::use_awaitable)
   // or even `asio::bind_executor(asio::system_executor{}, asio::use_awaitable)
   // as completion token

  private:
    using Base::finish;
    using Base::read_initial_metadata;

    Reply reply_;
};

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions