Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
from collections import defaultdict
from dataclasses import dataclass
from logging import getLogger
from typing import TYPE_CHECKING, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Optional

import ray
from .default_autoscaling_coordinator import (
DefaultAutoscalingCoordinator,
)
from ray.data._internal.average_calculator import TimeWindowAverageCalculator
from .resource_utilization_gauge import (
ResourceUtilizationGauge,
RollingLogicalUtilizationGauge,
)
from ray.data._internal.cluster_autoscaler import ClusterAutoscaler
from ray.data._internal.execution.interfaces.execution_options import ExecutionResources

Expand All @@ -25,22 +28,26 @@
class _NodeResourceSpec:

cpu: int
gpu: int
mem: int

def __post_init__(self):
assert isinstance(self.cpu, int)
assert self.cpu >= 0
assert isinstance(self.gpu, int)
assert self.gpu >= 0
assert isinstance(self.mem, int)
assert self.mem >= 0

@classmethod
def of(cls, cpu, mem):
def of(cls, *, cpu=0, gpu=0, mem=0):
cpu = math.floor(cpu)
gpu = math.floor(gpu)
mem = math.floor(mem)
return cls(cpu, mem)
return cls(cpu=cpu, gpu=gpu, mem=mem)

def to_bundle(self):
return {"CPU": self.cpu, "memory": self.mem}
return {"CPU": self.cpu, "GPU": self.gpu, "memory": self.mem}


class DefaultClusterAutoscalerV2(ClusterAutoscaler):
Expand All @@ -58,9 +65,6 @@ class DefaultClusterAutoscalerV2(ClusterAutoscaler):
termination.

Notes:
* For now, we assume GPUs are only used by actor pools. So cluster autoscaling
doesn't need to consider GPU nodes. GPU nodes will scale up as the actor
pools that require GPUs scale up.
* It doesn't consider multiple concurrent Datasets for now, as the cluster
utilization is calculated by "dataset_usage / global_resources".
"""
Expand All @@ -86,25 +90,24 @@ def __init__(
topology: "Topology",
resource_manager: "ResourceManager",
execution_id: str,
resource_utilization_calculator: Optional[ResourceUtilizationGauge] = None,
cluster_scaling_up_util_threshold: float = DEFAULT_CLUSTER_SCALING_UP_UTIL_THRESHOLD, # noqa: E501
cluster_scaling_up_delta: float = DEFAULT_CLUSTER_SCALING_UP_DELTA,
cluster_util_avg_window_s: float = DEFAULT_CLUSTER_UTIL_AVG_WINDOW_S,
cluster_util_check_interval_s: float = DEFAULT_CLUSTER_UTIL_CHECK_INTERVAL_S,
):
if resource_utilization_calculator is None:
assert cluster_util_check_interval_s >= 0, cluster_util_check_interval_s
resource_utilization_calculator = RollingLogicalUtilizationGauge(
resource_manager, cluster_util_avg_window_s=cluster_util_avg_window_s
)

self._resource_utilization_calculator = resource_utilization_calculator
# Threshold of cluster utilization to trigger scaling up.
self._cluster_scaling_up_util_threshold = cluster_scaling_up_util_threshold
assert cluster_scaling_up_delta > 0
self._cluster_scaling_up_delta = cluster_scaling_up_delta
assert cluster_util_avg_window_s > 0
# Calculator to calculate the average of cluster CPU utilization.
self._cluster_cpu_util_calculator = TimeWindowAverageCalculator(
window_s=cluster_util_avg_window_s,
)
# Calculator to calculate the average of cluster memory utilization.
self._cluster_mem_util_calculator = TimeWindowAverageCalculator(
window_s=cluster_util_avg_window_s,
)
assert cluster_util_check_interval_s >= 0
self._cluster_util_check_interval_s = cluster_util_check_interval_s
# Last time when the cluster utilization was checked.
self._last_cluster_util_check_time = 0
Expand All @@ -118,37 +121,26 @@ def __init__(
super().__init__(topology, resource_manager, execution_id)

def _get_node_resource_spec_and_count(self) -> Dict[_NodeResourceSpec, int]:
"""Get the unique node resource specs and their count in the cluster.

Similar to `_get_cluster_cpu_and_mem_util`, we only consider CPU and memory
resources.
"""
# Filter out the head node and GPU nodes.
"""Get the unique node resource specs and their count in the cluster."""
# Filter out the head node.
node_resources = [
node["Resources"]
for node in ray.nodes()
if node["Alive"]
and "node:__internal_head__" not in node["Resources"]
and "GPU" not in node["Resources"]
if node["Alive"] and "node:__internal_head__" not in node["Resources"]
]

nodes_resource_spec_count = defaultdict(int)
for r in node_resources:
node_resource_spec = _NodeResourceSpec.of(r["CPU"], r["memory"])
node_resource_spec = _NodeResourceSpec.of(
cpu=r["CPU"], gpu=r.get("GPU", 0), mem=r["memory"]
)
nodes_resource_spec_count[node_resource_spec] += 1

return nodes_resource_spec_count

def _get_cluster_cpu_and_mem_util(self) -> Tuple[Optional[float], Optional[float]]:
"""Return CPU and memory utilization of the cluster, or None if
no data was reported in the last `cluster_util_avg_window_s` seconds or
`_cluster_util_check_interval_s` seconds have not yet passed since the
last check.

We only consider CPU and memory utilization. Because for now we assume GPUs are
only used by actor pools. GPU node scaling will be handled by
`try_scale_up_or_down_actor_pool`.
"""
def try_trigger_scaling(self):
# Note, should call this method before checking `_last_request_time`,
# in order to update the average cluster utilization.
now = time.time()
if (
now - self._last_cluster_util_check_time
Expand All @@ -157,48 +149,21 @@ def _get_cluster_cpu_and_mem_util(self) -> Tuple[Optional[float], Optional[float
# Update observed resource utilization
self._last_cluster_util_check_time = now

cur_resource_usage = self._resource_manager.get_global_usage()
global_limits = self._resource_manager.get_global_limits()

if global_limits.cpu:
cpu_util = cur_resource_usage.cpu / global_limits.cpu
else:
cpu_util = 0
if global_limits.object_store_memory:
mem_util = (
cur_resource_usage.object_store_memory
/ global_limits.object_store_memory
)
else:
mem_util = 0

self._cluster_cpu_util_calculator.report(cpu_util)
self._cluster_mem_util_calculator.report(mem_util)

avg_cpu_util = self._cluster_cpu_util_calculator.get_average()
avg_mem_util = self._cluster_mem_util_calculator.get_average()

return avg_cpu_util, avg_mem_util

def try_trigger_scaling(self):
# Note, should call this method before checking `_last_request_time`,
# in order to update the average cluster utilization.
cpu_util, mem_util = self._get_cluster_cpu_and_mem_util()
self._resource_utilization_calculator.observe()

# Limit the frequency of autoscaling requests.
now = time.time()
if now - self._last_request_time < self.MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS:
return

cpu_util = cpu_util or 0
mem_util = mem_util or 0
util = self._resource_utilization_calculator.get()
if (
cpu_util < self._cluster_scaling_up_util_threshold
and mem_util < self._cluster_scaling_up_util_threshold
util.cpu < self._cluster_scaling_up_util_threshold
and util.gpu < self._cluster_scaling_up_util_threshold
and util.object_store_memory < self._cluster_scaling_up_util_threshold
):
logger.debug(
"Cluster utilization is below threshold: "
f"CPU={cpu_util:.2f}, memory={mem_util:.2f}."
f"CPU={util.cpu:.2f}, GPU={util.gpu:.2f}, memory={util.object_store_memory:.2f}."
)
# Still send an empty request when upscaling is not needed,
# to renew our registration on AutoscalingCoordinator.
Expand All @@ -211,10 +176,10 @@ def try_trigger_scaling(self):
if logger.isEnabledFor(logging.DEBUG):
debug_msg = (
"Scaling up cluster. Current utilization: "
f"CPU={cpu_util:.2f}, memory={mem_util:.2f}."
f"CPU={util.cpu:.2f}, GPU={util.gpu:.2f}, object_store_memory={util.object_store_memory:.2f}."
" Requesting resources:"
)
# TODO(hchen): We scale up all CPU nodes by the same delta for now.
# TODO(hchen): We scale up all nodes by the same delta for now.
# We may want to distinguish different node types based on their individual
# utilization.
for node_resource_spec, count in node_resource_spec_count.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import abc

from ray.data._internal.average_calculator import TimeWindowAverageCalculator
from ray.data._internal.execution.interfaces import ExecutionResources
from ray.data._internal.execution.resource_manager import ResourceManager

ClusterUtil = ExecutionResources


class ResourceUtilizationGauge(abc.ABC):
@abc.abstractmethod
def observe(self):
"""Observe the cluster utilization."""
...

@abc.abstractmethod
def get(self) -> ClusterUtil:
"""Get the resource cluster utilization."""
...


class RollingLogicalUtilizationGauge(ResourceUtilizationGauge):

# Default time window in seconds to calculate the average of cluster utilization.
DEFAULT_CLUSTER_UTIL_AVG_WINDOW_S: int = 10

def __init__(
self,
resource_manager: ResourceManager,
*,
cluster_util_avg_window_s: float = DEFAULT_CLUSTER_UTIL_AVG_WINDOW_S,
):
self._resource_manager = resource_manager

self._cluster_cpu_util_calculator = TimeWindowAverageCalculator(
cluster_util_avg_window_s
)
self._cluster_gpu_util_calculator = TimeWindowAverageCalculator(
cluster_util_avg_window_s
)
self._cluster_obj_mem_util_calculator = TimeWindowAverageCalculator(
cluster_util_avg_window_s
)

def observe(self):
"""Report the cluster utilization based on global usage / global limits."""

def save_div(numerator, denominator):
if not denominator:
return 0
else:
return numerator / denominator

global_usage = self._resource_manager.get_global_usage()
global_limits = self._resource_manager.get_global_limits()

cpu_util = save_div(global_usage.cpu, global_limits.cpu)
gpu_util = save_div(global_usage.gpu, global_limits.gpu)
obj_store_mem_util = save_div(
global_usage.object_store_memory, global_limits.object_store_memory
)

self._cluster_cpu_util_calculator.report(cpu_util)
self._cluster_gpu_util_calculator.report(gpu_util)
self._cluster_obj_mem_util_calculator.report(obj_store_mem_util)

def get(self) -> ExecutionResources:
"""Get the average cluster utilization based on global usage / global limits."""
return ExecutionResources(
cpu=self._cluster_cpu_util_calculator.get_average(),
gpu=self._cluster_gpu_util_calculator.get_average(),
object_store_memory=self._cluster_obj_mem_util_calculator.get_average(),
)
Loading