Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr<internal::Work> work
ScheduleAndDispatchTasks();
}

bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> work) {
void LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> work) {
const auto &task = work->task;
const auto &task_id = task.GetTaskSpecification().TaskId();
const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass();
auto object_ids = task.GetTaskSpecification().GetDependencies();
bool can_dispatch = true;
if (!object_ids.empty()) {
bool args_ready = task_dependency_manager_.RequestTaskDependencies(
task_id,
Expand All @@ -97,7 +96,6 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> w
} else {
RAY_LOG(DEBUG) << "Waiting for args for task: "
<< task.GetTaskSpecification().TaskId();
can_dispatch = false;
auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work));
RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second);
}
Expand All @@ -106,7 +104,6 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> w
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work));
}
return can_dispatch;
}

void LocalTaskManager::ScheduleAndDispatchTasks() {
Expand Down Expand Up @@ -403,8 +400,18 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
info_by_sched_cls_.erase(scheduling_class);
}
if (is_infeasible) {
// TODO(scv119): fail the request.
// Call CancelTask
const auto &front_task = dispatch_queue.front()->task.GetTaskSpecification();
RAY_LOG(ERROR) << "A task got scheduled to a node even though it was infeasible. "
"Please report an issue on GitHub.\nTask: "
<< front_task.DebugString();
auto dispatch_queue_iter = dispatch_queue.begin();
while (dispatch_queue_iter != dispatch_queue.end()) {
CancelTaskToDispatch(
*dispatch_queue_iter,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
"Scheduling failed due to the task becoming infeasible.");
dispatch_queue_iter = dispatch_queue.erase(dispatch_queue_iter);
}
tasks_to_dispatch_.erase(shapes_it++);
} else if (dispatch_queue.empty()) {
tasks_to_dispatch_.erase(shapes_it++);
Expand Down Expand Up @@ -604,8 +611,10 @@ bool LocalTaskManager::PoppedWorkerHandler(
// directly and raise a `RuntimeEnvSetupError` exception to user
// eventually. The task will be removed from dispatch queue in
// `CancelTask`.
CancelTask(
task_id,
CancelTasks(
[task_id](const auto &work) {
return task_id == work->task.GetTaskSpecification().TaskId();
},
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED,
/*scheduling_failure_message*/ runtime_env_setup_error_message);
} else if (status == PopWorkerStatus::JobFinished) {
Expand Down Expand Up @@ -846,28 +855,12 @@ bool LocalTaskManager::CancelTasks(

ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
tasks_to_dispatch_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_.GetLocalResourceManager().ReleaseWorkerResources(
work->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
RemoveFromRunningTasksIfExists(work->task);
work->SetStateCancelled();
tasks_cancelled = true;
return true;
} else {
if (!predicate(work)) {
return false;
}
CancelTaskToDispatch(work, failure_type, scheduling_failure_message);
tasks_cancelled = true;
return true;
});

ray::erase_if<std::shared_ptr<internal::Work>>(
Expand All @@ -889,16 +882,26 @@ bool LocalTaskManager::CancelTasks(
return tasks_cancelled;
}

bool LocalTaskManager::CancelTask(
const TaskID &task_id,
void LocalTaskManager::CancelTaskToDispatch(
const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
return CancelTasks(
[task_id](const std::shared_ptr<internal::Work> &work) {
return work->task.GetTaskSpecification().TaskId() == task_id;
},
failure_type,
scheduling_failure_message);
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_.GetLocalResourceManager().ReleaseWorkerResources(
work->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
RemoveFromRunningTasksIfExists(work->task);
work->SetStateCancelled();
}

const RayTask *LocalTaskManager::AnyPendingTasksForResourceAcquisition(
Expand Down
33 changes: 12 additions & 21 deletions src/ray/raylet/local_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,12 @@ class LocalTaskManager : public ILocalTaskManager {
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message);

/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
/// \param failure_type: The failure type.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "");
/// Cancels a task in tasks_to_dispatch_. Does not remove it from tasks_to_dispatch_.
void CancelTaskToDispatch(
const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "");

/// Attempts to dispatch all tasks which are ready to run. A task
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still
Expand Down Expand Up @@ -249,12 +244,6 @@ class LocalTaskManager : public ILocalTaskManager {
/// data structure.
void RecomputeDebugStats() const;

/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
///
/// \return True if the work can be immediately dispatched.
bool WaitForTaskArgsRequests(std::shared_ptr<internal::Work> work);

void Dispatch(
std::shared_ptr<WorkerInterface> worker,
absl::flat_hash_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
Expand All @@ -277,6 +266,10 @@ class LocalTaskManager : public ILocalTaskManager {
void ReleaseTaskArgs(const TaskID &task_id);

private:
/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
void WaitForTaskArgsRequests(std::shared_ptr<internal::Work> work);

const NodeID &self_node_id_;
const scheduling::NodeID self_scheduling_node_id_;
/// Responsible for resource tracking/view of the cluster.
Expand All @@ -293,15 +286,13 @@ class LocalTaskManager : public ILocalTaskManager {
/// running tasks per scheduling class.
struct SchedulingClassInfo {
explicit SchedulingClassInfo(int64_t cap)
: running_tasks(),
capacity(cap),
next_update_time(std::numeric_limits<int64_t>::max()) {}
: capacity(cap), next_update_time(std::numeric_limits<int64_t>::max()) {}
/// Track the running task ids in this scheduling class.
///
/// TODO(hjiang): Store cgroup manager along with task id as the value for map.
absl::flat_hash_set<TaskID> running_tasks;
/// The total number of tasks that can run from this scheduling class.
const uint64_t capacity;
uint64_t capacity;
/// The next time that a new task of this scheduling class may be dispatched.
int64_t next_update_time;
};
Expand Down
111 changes: 66 additions & 45 deletions src/ray/raylet/local_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class MockWorkerPool : public WorkerPoolInterface {
int num_pops;
};

namespace {

std::shared_ptr<ClusterResourceScheduler> CreateSingleNodeScheduler(
const std::string &id, double num_cpus, gcs::GcsClient &gcs_client) {
absl::flat_hash_map<std::string, double> local_node_resources;
Expand All @@ -151,7 +153,8 @@ std::shared_ptr<ClusterResourceScheduler> CreateSingleNodeScheduler(
}

RayTask CreateTask(const std::unordered_map<std::string, double> &required_resources,
const std::string &task_name = "default") {
const std::string &task_name = "default",
const std::vector<std::unique_ptr<TaskArg>> &args = {}) {
TaskSpecBuilder spec_builder;
TaskID id = RandomTaskId();
JobID job_id = RandomJobId();
Expand Down Expand Up @@ -181,9 +184,15 @@ RayTask CreateTask(const std::unordered_map<std::string, double> &required_resou

spec_builder.SetNormalTaskSpec(0, false, "", rpc::SchedulingStrategy(), ActorID::Nil());

for (const auto &arg : args) {
spec_builder.AddArg(*arg);
}

return RayTask(std::move(spec_builder).ConsumeAndBuild());
}

} // namespace

class LocalTaskManagerTest : public ::testing::Test {
public:
explicit LocalTaskManagerTest(double num_cpus = 3.0)
Expand Down Expand Up @@ -253,8 +262,6 @@ class LocalTaskManagerTest : public ::testing::Test {
};

TEST_F(LocalTaskManagerTest, TestTaskDispatchingOrder) {
RAY_LOG(INFO) << "Starting TestTaskDispatchingOrder";

// Initial setup: 3 CPUs available.
std::shared_ptr<MockWorker> worker1 =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 0);
Expand All @@ -270,28 +277,12 @@ TEST_F(LocalTaskManagerTest, TestTaskDispatchingOrder) {
auto task_f1 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, "f");
auto task_f2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, "f");
rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr](
Status, std::function<void()>, std::function<void()>) {
*callback_occurred_ptr = true;
};
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_f1,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_f1, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->ScheduleAndDispatchTasks();
pool_.TriggerCallbacks();
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_f2,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_f2, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->ScheduleAndDispatchTasks();
pool_.TriggerCallbacks();

Expand All @@ -301,40 +292,70 @@ TEST_F(LocalTaskManagerTest, TestTaskDispatchingOrder) {
auto task_f5 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, "f");
auto task_g1 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, "g");
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_f3,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_f3, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_f4,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_f4, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_f5,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_f5, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->WaitForTaskArgsRequests(std::make_shared<internal::Work>(
task_g1,
false,
false,
&reply,
[callback] { callback(Status::OK(), nullptr, nullptr); },
internal::WorkStatus::WAITING));
task_g1, false, false, &reply, [] {}, internal::WorkStatus::WAITING));
local_task_manager_->ScheduleAndDispatchTasks();
pool_.TriggerCallbacks();
auto tasks_to_dispatch_ = local_task_manager_->GetTaskToDispatch();
// Only task f in queue now as g is dispatched.
ASSERT_EQ(tasks_to_dispatch_.size(), 1);
}

TEST_F(LocalTaskManagerTest, TestNoLeakOnImpossibleInfeasibleTask) {
// Note that ideally it shouldn't be possible for an infeasible task to
// be in the local task manager when ScheduleAndDispatchTasks happens.
// See https://github.com/ray-project/ray/pull/52295 for reasons why added this.

std::shared_ptr<MockWorker> worker1 =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 0);
std::shared_ptr<MockWorker> worker2 =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 0);
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker1));

// Create 2 tasks that requires 3 CPU's each and are waiting on an arg.
auto arg_id = ObjectID::FromRandom();
std::vector<std::unique_ptr<TaskArg>> args;
args.push_back(
std::make_unique<TaskArgByReference>(arg_id, rpc::Address{}, "call_site"));
auto task1 = CreateTask({{kCPU_ResourceLabel, 3}}, "f", args);
auto task2 = CreateTask({{kCPU_ResourceLabel, 3}}, "f2", args);

EXPECT_CALL(object_manager_, Pull(_, _, _))
.WillOnce(::testing::Return(1))
.WillOnce(::testing::Return(2));

// Submit the tasks to the local task manager.
int num_callbacks_called = 0;
auto callback = [&num_callbacks_called]() { ++num_callbacks_called; };
rpc::RequestWorkerLeaseReply reply1;
local_task_manager_->QueueAndScheduleTask(std::make_shared<internal::Work>(
task1, false, false, &reply1, callback, internal::WorkStatus::WAITING));
rpc::RequestWorkerLeaseReply reply2;
local_task_manager_->QueueAndScheduleTask(std::make_shared<internal::Work>(
task2, false, false, &reply2, callback, internal::WorkStatus::WAITING));

// Node no longer has cpu.
scheduler_->GetLocalResourceManager().DeleteLocalResource(
scheduling::ResourceID::CPU());

// Simulate arg becoming local.
local_task_manager_->TasksUnblocked(
{task1.GetTaskSpecification().TaskId(), task2.GetTaskSpecification().TaskId()});

// Assert that the the correct rpc replies were sent back and the dispatch map is empty.
ASSERT_EQ(reply1.failure_type(),
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE);
ASSERT_EQ(reply2.failure_type(),
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE);
ASSERT_EQ(num_callbacks_called, 2);
ASSERT_EQ(local_task_manager_->GetTaskToDispatch().size(), 0);
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down