[Core] Add Logic to Emit Task Events to Event Aggregator#53402
[Core] Add Logic to Emit Task Events to Event Aggregator#53402edoakes merged 29 commits intoray-project:masterfrom
Conversation
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
| if (send_ray_events_to_aggregator_enabled_) { | ||
| RAY_CHECK(data->ray_event_data); | ||
| SendRayEventsToAggregator(std::move(*data->ray_event_data)); | ||
| } |
There was a problem hiding this comment.
should it be possible to send to gcs and aggregator?
There was a problem hiding this comment.
The plan is to move the event -> gcs path to the event aggregator. So here we just need to support event sending to the event aggregator and later we will need to add the logic for the aggregator to send to gcs.
There was a problem hiding this comment.
we could segfault if both are enabled because both move the same thing
There was a problem hiding this comment.
I think one moves the task_event_data and the other moves the ray_event_data. And they are created separately. So I think we can have them enabled at the same time without interfere with each other. But let me know if I missed anything.
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
israbbani
left a comment
There was a problem hiding this comment.
I did a first pass. I'll do another pass tomorrow morning!
src/ray/core_worker/core_worker.cc
Outdated
| std::make_shared<gcs::GcsClient>(options_.gcs_options)); | ||
| std::make_shared<gcs::GcsClient>(options_.gcs_options), | ||
| std::make_unique<rpc::EventAggregatorClientImpl>( | ||
| "127.0.0.1", options_.metrics_agent_port, *client_call_manager_)); |
There was a problem hiding this comment.
Is there ever a remote EventAggregatorClient? If not, we don't need the address as a parameter and can add it later once it's needed.
| // Whether the task events from the core worker are sent to GCS directly. | ||
| RAY_CONFIG(bool, enable_core_worker_task_event_to_gcs, true) | ||
|
|
||
| // Whether to enable the ray event to send to the event aggregator. | ||
| // Currently, only task events are supported. | ||
| RAY_CONFIG(bool, enable_core_worker_ray_event_to_aggregator, false) |
There was a problem hiding this comment.
Will these be removed after the migration is done? Can we add an issue/todo to make sure these are cleaned up afterwards?
We should have a separate follow up (team-wide) to move feature flags to a separate config file so it's obvious to users that they should not rely on them.
src/ray/core_worker/core_worker.cc
Outdated
| /* attempt_number */ 0, | ||
| rpc::TaskStatus::FINISHED, | ||
| /* timestamp */ absl::GetCurrentTimeNanos()); | ||
| /* timestamp */ absl::GetCurrentTimeNanos(), |
There was a problem hiding this comment.
| /* timestamp */ absl::GetCurrentTimeNanos(), | |
| /*timestamp=*/ absl::GetCurrentTimeNanos(), |
I don't like this convention, but we use it elsewhere so this is just to make it consistent.
|
|
||
| if (is_actor_task_event_) { | ||
| if (is_definition_event) { | ||
| ray_event.set_message("Actor task definition event"); |
There was a problem hiding this comment.
What's the point of having the message when there's an enum?
There was a problem hiding this comment.
This is a general field mainly to put high level string information about the event. I might not be applicable for task events. I'll just not set it to avoid unnecessary information sent.
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
|
@israbbani I should fixed all the comments that you have and it should be ready for another round of review |
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
| if (state_update_->node_id_.has_value()) { | ||
| RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) | ||
| .WithField("TaskStatus", task_status_) | ||
| << "Node ID should be included when task status changes to " |
There was a problem hiding this comment.
the check is the inverse of this comment though, "Task status must be SUBMITTED_TO_WORKER if node ID is included"
| } | ||
|
|
||
| if (state_update_->worker_id_.has_value()) { | ||
| RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER) |
| } | ||
|
|
||
| void TaskStatusEvent::ToRpcRayEvents( | ||
| std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>> |
There was a problem hiding this comment.
nit: maybe define a type for this, something like
using TaskDefinitionAndExecutionEvents = std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>>
| (std::unique_ptr<rpc::events::RayEventsData> data_ptr, | ||
| std::function<void(Status status)> callback), | ||
| (override)); | ||
| }; |
There was a problem hiding this comment.
i think we want to move away from using the shared mock folder where everything inside it is mocked, would want to just enclose this inside the task_event_buffer_test so other uses are encouraged to use fake implementations instead of a mock
src/ray/core_worker/core_worker.cc
Outdated
| /*attempt_number=*/0, | ||
| rpc::TaskStatus::RUNNING, | ||
| /*timestamp=*/absl::GetCurrentTimeNanos(), | ||
| is_actor_task_event, |
There was a problem hiding this comment.
| is_actor_task_event, | |
| /*is_actor_task_event=*/spec.IsActorTask(), |
personal nit but inline if just used in one spot
| auto task_state = execution_event_data.mutable_task_state(); | ||
| if (task_status_ != rpc::TaskStatus::NIL) { | ||
| (*task_state)[task_status_] = timestamp; |
There was a problem hiding this comment.
| auto task_state = execution_event_data.mutable_task_state(); | |
| if (task_status_ != rpc::TaskStatus::NIL) { | |
| (*task_state)[task_status_] = timestamp; | |
| auto &task_state = *execution_event_data.mutable_task_state(); | |
| if (task_status_ != rpc::TaskStatus::NIL) { | |
| task_state[task_status_] = timestamp; |
nit, a little cleaner
| auto [itr_ray_events, _] = agg_ray_events.try_emplace( | ||
| event->GetTaskAttempt(), | ||
| std::pair<std::optional<rpc::events::RayEvent>, | ||
| std::optional<rpc::events::RayEvent>>()); |
There was a problem hiding this comment.
| auto [itr_ray_events, _] = agg_ray_events.try_emplace( | |
| event->GetTaskAttempt(), | |
| std::pair<std::optional<rpc::events::RayEvent>, | |
| std::optional<rpc::events::RayEvent>>()); | |
| auto [itr_ray_events, _] = agg_ray_events.try_emplace( | |
| event->GetTaskAttempt()); |
| if (send_task_events_to_gcs_enabled_) { | ||
| auto [itr_task_events, _] = | ||
| agg_task_events.try_emplace(event->GetTaskAttempt(), rpc::TaskEvents()); | ||
| event->ToRpcTaskEvents(&(itr_task_events->second)); |
There was a problem hiding this comment.
| event->ToRpcTaskEvents(&(itr_task_events->second)); | |
| agg_task_events.try_emplace(event->GetTaskAttempt()); | |
| event->ToRpcTaskEvents(&itr_task_events->second); |
also ToRpcRayEvents takes by ref, this one takes by ptr, any reason for inconsistency?
There was a problem hiding this comment.
No particular reason that I'm aware of. This is legacy code and we will remove the code path after the task events fully migrated to the event aggregator.
| if (send_ray_events_to_aggregator_enabled_) { | ||
| RAY_CHECK(data->ray_event_data); | ||
| SendRayEventsToAggregator(std::move(*data->ray_event_data)); | ||
| } |
There was a problem hiding this comment.
we could segfault if both are enabled because both move the same thing
| }); | ||
|
|
||
| return Status::OK(); | ||
| } |
There was a problem hiding this comment.
this function doesn't really seem to be adding much on top of AddEvents imo and is kind of a downgrade from the standard implementation for these 3 reasons.
- The debug is hidden pretty deep within the stack
- Always returning Status::OK so returning status doesn't help
- Forcing the caller to pass in a unique ptr kills the optimization opportunity that could be had by moving RayEventsData in.
| } | ||
|
|
||
| private: | ||
| void AddEvents(const rpc::events::AddEventRequest &request, |
There was a problem hiding this comment.
why not use the macro we use in the rest of the codebase to stay consistent
| request, | ||
| callback, | ||
| "EventAggregatorService.grpc_client.AddEvents", | ||
| // TODO(myan): Add timeout and retry logic. |
There was a problem hiding this comment.
this should be in the proto file, or should be handled in this PR. Based on above, I think it doesn't matter that much if it fails. Should just state that in the proto file the way it is in core_worker.proto and node_manager.proto
| std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>> | ||
| &ray_events) { | ||
| google::protobuf::Timestamp timestamp = AbslTimeNanosToProtoTimestamp(timestamp_); | ||
|
|
There was a problem hiding this comment.
can you add an auto &[name1, name2] = ray_events; up here, more readable than using .first, .second throughout
@can-anyscale I tested manually for now to not make the PR even larger. And I'm planning to add the integration tests in a followup PR. The manual test result is as follows:
The events I got from a local http echo server: https://gist.github.com/MengjinYan/b8bc27086bad6ba220d452fb221d7111 |
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
|
@can-anyscale's and @dayshah's comments. |
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
israbbani
left a comment
There was a problem hiding this comment.
Reviewd everything except for the task_event_buffer_test. Looks good. Left some nits and a few clarification questions.
BUILD.bazel
Outdated
| ], | ||
| ) | ||
|
|
||
| # Event Aggregator gRPC lib. |
There was a problem hiding this comment.
I don't think the comment adds anything.
| /*attempt_number=*/0, | ||
| rpc::TaskStatus::FINISHED, | ||
| /* timestamp */ absl::GetCurrentTimeNanos()); | ||
| /*timestamp=*/absl::GetCurrentTimeNanos(), |
There was a problem hiding this comment.
Are these automatically generated by a linter?
There was a problem hiding this comment.
No, I just added them.
| definition_event_data.mutable_ref_ids()->insert(labels.begin(), labels.end()); | ||
|
|
||
| // Specific fields | ||
| if constexpr (std::is_same_v<T, rpc::events::ActorTaskDefinitionEvent>) { |
There was a problem hiding this comment.
Sounds good. Just to check my understanding, once the migration is complete, this function will no longer need a template type?
| /// \param[out] ray_events The pair of TaskDefinitionEvent and TaskExecutionEvent to be | ||
| /// filled. |
There was a problem hiding this comment.
Might be helpful for the reader if you clarified that TaskDefinitionEvent and TaskExecutionEvent are both proto messages.
| void ToRpcTaskExportEvents( | ||
| std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) override; | ||
|
|
||
| void ToRpcRayEvents(RayEventsPair &ray_events) override; |
There was a problem hiding this comment.
Need to add documentation for the public API.
| } | ||
|
|
||
| message AddEventRequest { | ||
| message AddEventsRequest { |
There was a problem hiding this comment.
Why did everything get renamed to Events instead of Event?
There was a problem hiding this comment.
There are some naming inconsistency with the previous implementation of the event aggregator so I make everything consistent with the same naming convention.
| rpc AddEvents(AddEventRequest) returns (AddEventReply); | ||
| // Add events to the local event aggregator. | ||
| // Failure: | ||
| // Infinite timeout because the communcaiton to the event aggregator will |
There was a problem hiding this comment.
| // Infinite timeout because the communcaiton to the event aggregator will | |
| // Infinite timeout because the communication to the event aggregator will |
| const rpc::ClientCallback<rpc::events::AddEventsReply> &> | ||
| &args) override { | ||
| auto &request = std::get<0>(args); | ||
| RAY_LOG(INFO) << "[myan] request.events_data().events_size()=" |
There was a problem hiding this comment.
Probably left in from debugging.
| const rpc::events::RayEventsData &expect_data) { | ||
| RAY_LOG(INFO) << "[myan] actual_data.events_size()=" << actual_data.events_size(); | ||
|
|
||
| // Sore and compare |
There was a problem hiding this comment.
| // Sore and compare | |
| // Store and compare |
|
|
||
| static void CompareRayEventsData(const rpc::events::RayEventsData &actual_data, | ||
| const rpc::events::RayEventsData &expect_data) { | ||
| RAY_LOG(INFO) << "[myan] actual_data.events_size()=" << actual_data.events_size(); |
israbbani
left a comment
There was a problem hiding this comment.
Tests look okay to me. Leaving feedback for future work for when we refactor this
- We should refactor out the event pushing logic from the TaskEventBuffer. The public API will become pretty simple (AddEvent, GetEvents) and that will make writing tests against the public API better.
- For the tests themselves, I use the convention UnitOfWorkConditionUnderTestExpectResult for naming. GTest doesn't let you use underscores, but we can standardize on a new delimiter. E.g. AddEventDropsEventsWhenBufferIsFull or something like that.
Overall, looks good and ready to merge once nits have been addressed.
| f", and {len(error_messages) - truncate_num} more events dropped" | ||
| ) | ||
| status = events_event_aggregator_service_pb2.AddEventStatus( | ||
| status = events_event_aggregator_service_pb2.AddEventsStatus( |
There was a problem hiding this comment.
why do we need a custom AddEventsStatus -- are gRPC status codes & messages insufficient?
There was a problem hiding this comment.
The status in the grpc response is mainly to indicate whether all events in the batch are successfully added to the event aggregator, while I think the GRPC status is mainly indicate whether the response is successfully received.
I'm actually planning to have a followup PR to improve the response to return the number of events failed to add to the event aggregator due to unknown errors and also track the number events dropped due to a full event aggregator on the aggregator side. And probably remove the status & error message.
There was a problem hiding this comment.
What action will the client take based on the fine-grained status information such as the count of events that were failed to be added?
There was a problem hiding this comment.
The count is mainly for observability purposes so that we can have metrics to understand how many events are missing and at which place they are dropped and why.
There was a problem hiding this comment.
this seems like premature optimization/over-engineering to me. couldn't the server itself export the metrics about the number of dropped events? and why would we expect that partial drops will be a common occurrence (vs. all events being recorded or all being dropped)
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
…#53402) This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: avigyabb <avigyabb@stanford.edu>
…#53402) This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: avigyabb <avigyabb@stanford.edu>
This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com>
This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: Kamil Kaczmarek <kamil@anyscale.com>
…#53402) This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: Michael Acar <michael.j.acar@gmail.com>
…#53402) This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
…#53402) This PR adds the logic to emit task events to event aggregator, mainly: * Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS * Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator * Added test task event buffer tests for the ray event to aggregator path --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: myan <myan@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>


Why are these changes needed?
This PR adds the logic to emit task events to event aggregator, mainly:
Related issue number
N/A
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.