Skip to content

[Core] Use TaskAttempt as the unique id for inflight actor task#52812

Merged
jjyao merged 16 commits intoray-project:masterfrom
jjyao:jjyao/dcancel
May 12, 2025
Merged

[Core] Use TaskAttempt as the unique id for inflight actor task#52812
jjyao merged 16 commits intoray-project:masterfrom
jjyao:jjyao/dcancel

Conversation

@jjyao
Copy link
Copy Markdown
Contributor

@jjyao jjyao commented May 6, 2025

Why are these changes needed?

Currently ActorTaskSubmitter has inflight_task_callbacks map whose key is TaskID to track all inflight actor tasks. It can cause the following issue:

  1. Task_1_Attempt_0 is submitted and added to inflight_task_callbacks.
  2. GCS tells the caller that the actor is dead before Task_1_Attemp_0 PushTask rpc callback is called.
  3. ActorTaskSubmitter failed all inflight tasks and cleared inflight_task_callbacks
  4. Task_1_Attempt_1 is submitted (actor task retry) and added to inflight_task_callbacks. The key is Task_1.
  5. Task_1_Attempt_0 PushTask rpc callback is finally called (network error status) and we use Task_1 as the key to see if it's in the inflight_task_callbacks and we can find it and the callback is called. The problem is that this callback is for Task_1_Attempt_1 not Task_1_Attempt_0 so we end up failing the wrong task attempt.

Solution: use TaskAttempt to track each inflight task which is unique.

TODO: Investigate whether we can remove inflight_task_callbacks (#19354) all together after #51904 and purely rely on GRPC to call the callback when the actor is dead or restarted.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

jjyao added 6 commits May 5, 2025 21:51
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
@jjyao jjyao added the go add ONLY when ready to merge, run all tests label May 7, 2025
jjyao added 4 commits May 7, 2025 14:40
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
@jjyao jjyao changed the title Debug data task cancellation [Core] Use TaskAttempt has the unique id for inflight actor task May 8, 2025
@jjyao jjyao changed the title [Core] Use TaskAttempt has the unique id for inflight actor task [Core] Use TaskAttempt as the unique id for inflight actor task May 8, 2025
@jjyao jjyao marked this pull request as ready for review May 8, 2025 17:59
ray_cc_test(
name = "direct_actor_transport_test",
srcs = ["direct_actor_transport_test.cc"],
name = "task_receiver_test",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split direct_actor_transport_test into task_receiver_test and actor_task_submitter_test.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@@ -618,29 +596,30 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) {
}

TEST_P(ActorTaskSubmitterTest, TestActorRestartFailInflightTasks) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test that's changed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best practice to only test against the interaction/public API as much as possible. The biggest exception is when dealing with code that was already written and is hard to test.

Here are a few arguments I've seen in favour of this:

  1. The tests tell you what the contract for the API is so they only break when the contract is broken
  2. You can refactor the implementation and if the tests pass you don't have to worry about bugs
  3. The tests are not brittle i.e. they don't break due to implementation details

In this case, the API invariants we are being tested are:

  1. Actor tasks without inflight retries succeed without a problem
  2. Actor task retries succeed correctly while retries that have been discarded do not succeed

It's fair game to add a fake object like worker_client_ and check it's state to see if the correct side-effects happen (e.g. callback is added correctly), but these tests shouldn't inspect the private state of ActorTaskSubmitter since that is the API under test.

TL/DR: we should keep all the assertions against fake objects (such as worker_client_) that are testing side-effects of ActorTaskSubmitter but we should remove assertions like submitter_.NumInflightTasks(actor_id) which look at the implementation of ActionTaskSubmitter and not it's public API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the very long response. My two cents on testing.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with this ^ same motivation for my comment below about NumInflightTasks()

@@ -0,0 +1,286 @@
// Copyright 2017 The Ray Authors.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing changed, pure refactoring

RAY_CHECK(it != client_queues_.end());
auto &queue = it->second;
auto callback_it = queue.inflight_task_callbacks.find(task_id);
auto callback_it = queue.inflight_task_callbacks.find(task_attempt);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key fix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to cancel an inflight GRPC request in addition to this? I don't have a lot of GRPC experience, but if we're going to discard the response, might as well cancel the request so this is never called.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not familiar with this part. Need to do some investigation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a useful follow up, but looking at our GRPC implementation, it doesn't look straightforward to implement.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gRPC has a notion of cancellation but our c++ code doesn't handle it so it would only be useful if it happens before the RPC handler begins on the server side

@jjyao jjyao requested review from edoakes and kevin85421 May 8, 2025 18:02
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Copy link
Copy Markdown
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix LGTM, stylistic comments to improve the tests

Have you audited other maps keyed on TaskID to check if there are other similar issues?

TaskID caller_id = TaskID::Nil()) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::FromRandom(actor_id.JobId()).Binary());
task.GetMutableMessage().set_attempt_number(0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't necessary; protobuf has well-defined zero values. but fine to do if you think it improves readability

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I write it out for readability to emphasize it's attempt 0 of the task since attempt number is a key in this test.

Comment on lines +252 to +253
/// Return the number of inflight actor tasks for the given actor id.
size_t NumInflightTasks(const ActorID &actor_id) const;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly are "inflight" tasks from the perspective of the caller of this interface? Is "inflight" a specific state or a set of states?

We also have the notion of "pending" tasks in the API. Are these the same thing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inflight are those actor tasks whose PushTask RPC is sent and inflight (response not received yet). Pending are tasks that are queued (no PushTask RPC yet).

Added comment to explain what inflight means.

Comment on lines +619 to +624
// Submit a task.
ASSERT_TRUE(CheckSubmitTask(task1));
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
ASSERT_EQ(submitter_.NumInflightTasks(actor_id), 0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of this codeblock in the test? it doesn't seem relevant

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure we have a clean state for the rest of the test: i.e. Task 1 should be completely finished.

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
@jjyao jjyao requested a review from edoakes May 9, 2025 21:51
jjyao added 2 commits May 9, 2025 16:45
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
@jjyao
Copy link
Copy Markdown
Contributor Author

jjyao commented May 12, 2025

Have you audited other maps keyed on TaskID to check if there are other similar issues?

I checked and some are suspicious. Created #52940 as the follow-up.

Copy link
Copy Markdown
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix LGTM. I left a pretty detailed comment about writing tests against public APIs vs implementation details. I think it'll help improve our tests and our code if we try to follow that as a general guideline. 🚢

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
@jjyao jjyao merged commit 078e6e6 into ray-project:master May 12, 2025
4 of 5 checks passed
@jjyao jjyao deleted the jjyao/dcancel branch May 12, 2025 23:28
ran1995data pushed a commit to ran1995data/ray that referenced this pull request May 13, 2025
…project#52812)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: weiran11 <weiran11@baidu.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants