Skip to content

[Core] Add Logic to Emit Task Events to Event Aggregator#53402

Merged
edoakes merged 29 commits intoray-project:masterfrom
MengjinYan:core-703-3
Jul 31, 2025
Merged

[Core] Add Logic to Emit Task Events to Event Aggregator#53402
edoakes merged 29 commits intoray-project:masterfrom
MengjinYan:core-703-3

Conversation

@MengjinYan
Copy link
Copy Markdown
Contributor

@MengjinYan MengjinYan commented May 29, 2025

Why are these changes needed?

This PR adds the logic to emit task events to event aggregator, mainly:

  • Update the task event buffer logic to convert and send the ray event to the event aggregator at the same time when the task events are converted and sent to GCS
  • Added 2 configs to control turning on/off the path to send to GCS and the path to send to event aggregator
  • Added test task event buffer tests for the ray event to aggregator path

Related issue number

N/A

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
if (send_ray_events_to_aggregator_enabled_) {
RAY_CHECK(data->ray_event_data);
SendRayEventsToAggregator(std::move(*data->ray_event_data));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be possible to send to gcs and aggregator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to move the event -> gcs path to the event aggregator. So here we just need to support event sending to the event aggregator and later we will need to add the logic for the aggregator to send to gcs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could segfault if both are enabled because both move the same thing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one moves the task_event_data and the other moves the ray_event_data. And they are created separately. So I think we can have them enabled at the same time without interfere with each other. But let me know if I missed anything.

MengjinYan and others added 2 commits June 5, 2025 00:27
Signed-off-by: myan <myan@anyscale.com>
@MengjinYan MengjinYan added the go add ONLY when ready to merge, run all tests label Jun 5, 2025
@MengjinYan MengjinYan marked this pull request as ready for review June 9, 2025 18:49
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Copy link
Copy Markdown
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass. I'll do another pass tomorrow morning!

std::make_shared<gcs::GcsClient>(options_.gcs_options));
std::make_shared<gcs::GcsClient>(options_.gcs_options),
std::make_unique<rpc::EventAggregatorClientImpl>(
"127.0.0.1", options_.metrics_agent_port, *client_call_manager_));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a remote EventAggregatorClient? If not, we don't need the address as a parameter and can add it later once it's needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in df01b29

Comment on lines +957 to +962
// Whether the task events from the core worker are sent to GCS directly.
RAY_CONFIG(bool, enable_core_worker_task_event_to_gcs, true)

// Whether to enable the ray event to send to the event aggregator.
// Currently, only task events are supported.
RAY_CONFIG(bool, enable_core_worker_ray_event_to_aggregator, false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these be removed after the migration is done? Can we add an issue/todo to make sure these are cleaned up afterwards?

We should have a separate follow up (team-wide) to move feature flags to a separate config file so it's obvious to users that they should not rely on them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in df01b29 and e769c90

/* attempt_number */ 0,
rpc::TaskStatus::FINISHED,
/* timestamp */ absl::GetCurrentTimeNanos());
/* timestamp */ absl::GetCurrentTimeNanos(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* timestamp */ absl::GetCurrentTimeNanos(),
/*timestamp=*/ absl::GetCurrentTimeNanos(),

I don't like this convention, but we use it elsewhere so this is just to make it consistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in df01b29


if (is_actor_task_event_) {
if (is_definition_event) {
ray_event.set_message("Actor task definition event");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of having the message when there's an enum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general field mainly to put high level string information about the event. I might not be applicable for task events. I'll just not set it to avoid unnecessary information sent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in df01b29

@MengjinYan MengjinYan requested a review from can-anyscale July 2, 2025 21:48
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
@MengjinYan
Copy link
Copy Markdown
Contributor Author

@israbbani I should fixed all the comments that you have and it should be ready for another round of review

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
@MengjinYan MengjinYan requested a review from a team as a code owner July 17, 2025 06:16
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
@jjyao jjyao requested review from dayshah and israbbani July 18, 2025 17:56
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Copy link
Copy Markdown
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq, do we have an e2e python test that check the aggregator has the events we expect, etc.?

if (state_update_->node_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
.WithField("TaskStatus", task_status_)
<< "Node ID should be included when task status changes to "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the check is the inverse of this comment though, "Task status must be SUBMITTED_TO_WORKER if node ID is included"

}

if (state_update_->worker_id_.has_value()) {
RAY_CHECK(task_status_ == rpc::TaskStatus::SUBMITTED_TO_WORKER)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}

void TaskStatusEvent::ToRpcRayEvents(
std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe define a type for this, something like
using TaskDefinitionAndExecutionEvents = std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>>

(std::unique_ptr<rpc::events::RayEventsData> data_ptr,
std::function<void(Status status)> callback),
(override));
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we want to move away from using the shared mock folder where everything inside it is mocked, would want to just enclose this inside the task_event_buffer_test so other uses are encouraged to use fake implementations instead of a mock

/*attempt_number=*/0,
rpc::TaskStatus::RUNNING,
/*timestamp=*/absl::GetCurrentTimeNanos(),
is_actor_task_event,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
is_actor_task_event,
/*is_actor_task_event=*/spec.IsActorTask(),

personal nit but inline if just used in one spot

Comment on lines +216 to +218
auto task_state = execution_event_data.mutable_task_state();
if (task_status_ != rpc::TaskStatus::NIL) {
(*task_state)[task_status_] = timestamp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto task_state = execution_event_data.mutable_task_state();
if (task_status_ != rpc::TaskStatus::NIL) {
(*task_state)[task_status_] = timestamp;
auto &task_state = *execution_event_data.mutable_task_state();
if (task_status_ != rpc::TaskStatus::NIL) {
task_state[task_status_] = timestamp;

nit, a little cleaner

Comment on lines +641 to +644
auto [itr_ray_events, _] = agg_ray_events.try_emplace(
event->GetTaskAttempt(),
std::pair<std::optional<rpc::events::RayEvent>,
std::optional<rpc::events::RayEvent>>());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto [itr_ray_events, _] = agg_ray_events.try_emplace(
event->GetTaskAttempt(),
std::pair<std::optional<rpc::events::RayEvent>,
std::optional<rpc::events::RayEvent>>());
auto [itr_ray_events, _] = agg_ray_events.try_emplace(
event->GetTaskAttempt());

if (send_task_events_to_gcs_enabled_) {
auto [itr_task_events, _] =
agg_task_events.try_emplace(event->GetTaskAttempt(), rpc::TaskEvents());
event->ToRpcTaskEvents(&(itr_task_events->second));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event->ToRpcTaskEvents(&(itr_task_events->second));
agg_task_events.try_emplace(event->GetTaskAttempt());
event->ToRpcTaskEvents(&itr_task_events->second);

also ToRpcRayEvents takes by ref, this one takes by ptr, any reason for inconsistency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason that I'm aware of. This is legacy code and we will remove the code path after the task events fully migrated to the event aggregator.

if (send_ray_events_to_aggregator_enabled_) {
RAY_CHECK(data->ray_event_data);
SendRayEventsToAggregator(std::move(*data->ray_event_data));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could segfault if both are enabled because both move the same thing

});

return Status::OK();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function doesn't really seem to be adding much on top of AddEvents imo and is kind of a downgrade from the standard implementation for these 3 reasons.

  1. The debug is hidden pretty deep within the stack
  2. Always returning Status::OK so returning status doesn't help
  3. Forcing the caller to pass in a unique ptr kills the optimization opportunity that could be had by moving RayEventsData in.

}

private:
void AddEvents(const rpc::events::AddEventRequest &request,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the macro we use in the rest of the codebase to stay consistent

request,
callback,
"EventAggregatorService.grpc_client.AddEvents",
// TODO(myan): Add timeout and retry logic.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the proto file, or should be handled in this PR. Based on above, I think it doesn't matter that much if it fails. Should just state that in the proto file the way it is in core_worker.proto and node_manager.proto

std::pair<std::optional<rpc::events::RayEvent>, std::optional<rpc::events::RayEvent>>
&ray_events) {
google::protobuf::Timestamp timestamp = AbslTimeNanosToProtoTimestamp(timestamp_);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add an auto &[name1, name2] = ray_events; up here, more readable than using .first, .second throughout

@MengjinYan
Copy link
Copy Markdown
Contributor Author

qq, do we have an e2e python test that check the aggregator has the events we expect, etc.?

@can-anyscale I tested manually for now to not make the PR even larger. And I'm planning to add the integration tests in a followup PR.

The manual test result is as follows:

# start ray
RAY_enable_core_worker_ray_event_to_aggregator=1 ray start --head
# run the script 
python test-tasks.py

The events I got from a local http echo server: https://gist.github.com/MengjinYan/b8bc27086bad6ba220d452fb221d7111

Copy link
Copy Markdown
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good on my end for the event exporting logic, defer to others for other parts, thanks

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
@MengjinYan
Copy link
Copy Markdown
Contributor Author

@can-anyscale's and @dayshah's comments.

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Copy link
Copy Markdown
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewd everything except for the task_event_buffer_test. Looks good. Left some nits and a few clarification questions.

BUILD.bazel Outdated
],
)

# Event Aggregator gRPC lib.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the comment adds anything.

/*attempt_number=*/0,
rpc::TaskStatus::FINISHED,
/* timestamp */ absl::GetCurrentTimeNanos());
/*timestamp=*/absl::GetCurrentTimeNanos(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these automatically generated by a linter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I just added them.

definition_event_data.mutable_ref_ids()->insert(labels.begin(), labels.end());

// Specific fields
if constexpr (std::is_same_v<T, rpc::events::ActorTaskDefinitionEvent>) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Just to check my understanding, once the migration is complete, this function will no longer need a template type?

Comment on lines +85 to +86
/// \param[out] ray_events The pair of TaskDefinitionEvent and TaskExecutionEvent to be
/// filled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be helpful for the reader if you clarified that TaskDefinitionEvent and TaskExecutionEvent are both proto messages.

void ToRpcTaskExportEvents(
std::shared_ptr<rpc::ExportTaskEventData> rpc_task_export_event_data) override;

void ToRpcRayEvents(RayEventsPair &ray_events) override;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add documentation for the public API.

}

message AddEventRequest {
message AddEventsRequest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did everything get renamed to Events instead of Event?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some naming inconsistency with the previous implementation of the event aggregator so I make everything consistent with the same naming convention.

rpc AddEvents(AddEventRequest) returns (AddEventReply);
// Add events to the local event aggregator.
// Failure:
// Infinite timeout because the communcaiton to the event aggregator will
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Infinite timeout because the communcaiton to the event aggregator will
// Infinite timeout because the communication to the event aggregator will

const rpc::ClientCallback<rpc::events::AddEventsReply> &>
&args) override {
auto &request = std::get<0>(args);
RAY_LOG(INFO) << "[myan] request.events_data().events_size()="
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably left in from debugging.

const rpc::events::RayEventsData &expect_data) {
RAY_LOG(INFO) << "[myan] actual_data.events_size()=" << actual_data.events_size();

// Sore and compare
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Sore and compare
// Store and compare


static void CompareRayEventsData(const rpc::events::RayEventsData &actual_data,
const rpc::events::RayEventsData &expect_data) {
RAY_LOG(INFO) << "[myan] actual_data.events_size()=" << actual_data.events_size();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Copy Markdown
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests look okay to me. Leaving feedback for future work for when we refactor this

  1. We should refactor out the event pushing logic from the TaskEventBuffer. The public API will become pretty simple (AddEvent, GetEvents) and that will make writing tests against the public API better.
  2. For the tests themselves, I use the convention UnitOfWorkConditionUnderTestExpectResult for naming. GTest doesn't let you use underscores, but we can standardize on a new delimiter. E.g. AddEventDropsEventsWhenBufferIsFull or something like that.

Overall, looks good and ready to merge once nits have been addressed.

f", and {len(error_messages) - truncate_num} more events dropped"
)
status = events_event_aggregator_service_pb2.AddEventStatus(
status = events_event_aggregator_service_pb2.AddEventsStatus(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a custom AddEventsStatus -- are gRPC status codes & messages insufficient?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status in the grpc response is mainly to indicate whether all events in the batch are successfully added to the event aggregator, while I think the GRPC status is mainly indicate whether the response is successfully received.

I'm actually planning to have a followup PR to improve the response to return the number of events failed to add to the event aggregator due to unknown errors and also track the number events dropped due to a full event aggregator on the aggregator side. And probably remove the status & error message.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What action will the client take based on the fine-grained status information such as the count of events that were failed to be added?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The count is mainly for observability purposes so that we can have metrics to understand how many events are missing and at which place they are dropped and why.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like premature optimization/over-engineering to me. couldn't the server itself export the metrics about the number of dropped events? and why would we expect that partial drops will be a common occurrence (vs. all events being recorded or all being dropped)

MengjinYan and others added 5 commits July 29, 2025 22:45
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
@edoakes edoakes enabled auto-merge (squash) July 31, 2025 21:56
@edoakes edoakes merged commit bbbe9ef into ray-project:master Jul 31, 2025
6 checks passed
avibasnet31 pushed a commit to avibasnet31/ray that referenced this pull request Aug 2, 2025
…#53402)

This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: avigyabb <avigyabb@stanford.edu>
avibasnet31 pushed a commit to avibasnet31/ray that referenced this pull request Aug 2, 2025
…#53402)

This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: avigyabb <avigyabb@stanford.edu>
elliot-barn pushed a commit that referenced this pull request Aug 4, 2025
This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
kamil-kaczmarek pushed a commit that referenced this pull request Aug 4, 2025
This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: Kamil Kaczmarek <kamil@anyscale.com>
mjacar pushed a commit to mjacar/ray that referenced this pull request Aug 5, 2025
…#53402)

This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: Michael Acar <michael.j.acar@gmail.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…#53402)

This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…#53402)

This PR adds the logic to emit task events to event aggregator, mainly:
* Update the task event buffer logic to convert and send the ray event
to the event aggregator at the same time when the task events are
converted and sent to GCS
* Added 2 configs to control turning on/off the path to send to GCS and
the path to send to event aggregator
* Added test task event buffer tests for the ray event to aggregator
path

---------

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: myan <myan@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants