Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ray
from .autoscaler import Autoscaler
from .autoscaling_actor_pool import ActorPoolScalingRequest, AutoscalingActorPool
from .util import get_max_scale_up
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
Expand Down Expand Up @@ -97,6 +98,9 @@ def _derive_target_scaling_config(
return ActorPoolScalingRequest.no_op(
reason="operator exceeding resource quota"
)
budget = self._resource_manager.get_budget(op)
if get_max_scale_up(actor_pool, budget) == 0:
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")

return ActorPoolScalingRequest.upscale(
delta=1,
Expand Down
48 changes: 48 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import math
from typing import Optional

from .autoscaling_actor_pool import AutoscalingActorPool
from ray.data._internal.execution.interfaces import ExecutionResources


def get_max_scale_up(
actor_pool: AutoscalingActorPool,
budget: Optional[ExecutionResources],
) -> Optional[int]:
"""Get the maximum number of actors that can be scaled up.

Args:
actor_pool: The actor pool to scale up.
budget: The budget to scale up.

Returns:
The maximum number of actors that can be scaled up, or `None` if you can
scale up infinitely.
"""
if budget is None:
return None

assert budget.cpu >= 0 and budget.gpu >= 0

num_cpus_per_actor = actor_pool.per_actor_resource_usage().cpu
num_gpus_per_actor = actor_pool.per_actor_resource_usage().gpu
assert num_cpus_per_actor >= 0 and num_gpus_per_actor >= 0

max_cpu_scale_up: float = float("inf")
if num_cpus_per_actor > 0 and not math.isinf(budget.cpu):
max_cpu_scale_up = budget.cpu // num_cpus_per_actor

max_gpu_scale_up: float = float("inf")
if num_gpus_per_actor > 0 and not math.isinf(budget.gpu):
max_gpu_scale_up = budget.gpu // num_gpus_per_actor

max_scale_up = min(max_cpu_scale_up, max_gpu_scale_up)
if math.isinf(max_scale_up):
return None
else:
assert not math.isnan(max_scale_up), (
budget,
num_cpus_per_actor,
num_gpus_per_actor,
)
return int(max_scale_up)
14 changes: 13 additions & 1 deletion python/ray/data/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ray.data._internal.execution.operators.base_physical_operator import (
InternalQueueOperatorMixin,
)
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import OpState
from ray.data.context import (
AutoscalingConfig,
Expand All @@ -26,9 +27,12 @@ def test_actor_pool_scaling():
"""Test `_actor_pool_should_scale_up` and `_actor_pool_should_scale_down`
in `DefaultAutoscaler`"""

resource_manager = MagicMock(
spec=ResourceManager, get_budget=MagicMock(return_value=None)
)
autoscaler = DefaultAutoscaler(
topology=MagicMock(),
resource_manager=MagicMock(),
resource_manager=resource_manager,
execution_id="execution_id",
config=AutoscalingConfig(
actor_pool_util_upscaling_threshold=1.0,
Expand All @@ -47,6 +51,7 @@ def test_actor_pool_scaling():
num_pending_actors=MagicMock(return_value=0),
num_free_task_slots=MagicMock(return_value=5),
num_tasks_in_flight=MagicMock(return_value=15),
per_actor_resource_usage=MagicMock(return_value=ExecutionResources(cpu=1)),
_max_actor_concurrency=1,
get_pool_util=MagicMock(
# NOTE: Unittest mocking library doesn't support proxying to actual
Expand Down Expand Up @@ -190,6 +195,13 @@ def assert_autoscaling_action(*, delta: int, expected_reason: Optional[str]):
expected_reason="pool exceeding max size",
)

# Should no-op because the op has no budget.
with patch(resource_manager, "get_budget", ExecutionResources.zero()):
assert_autoscaling_action(
delta=0,
expected_reason="exceeded resource limits",
)


def test_cluster_scaling():
"""Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
Expand Down