Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/cluster/kubernetes/k8s-ecosystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ k8s-ecosystem/volcano
k8s-ecosystem/yunikorn
k8s-ecosystem/kueue
k8s-ecosystem/istio
k8s-ecosystem/scheduler-plugins
```

* {ref}`kuberay-ingress`
Expand All @@ -23,3 +24,4 @@ k8s-ecosystem/istio
* {ref}`kuberay-yunikorn`
* {ref}`kuberay-kueue`
* {ref}`kuberay-istio`
* {ref}`kuberay-scheduler-plugins`
60 changes: 60 additions & 0 deletions doc/source/cluster/kubernetes/k8s-ecosystem/scheduler-plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
(kuberay-scheduler-plugins)=
# KubeRay integration with scheduler plugins

The [kubernetes-sigs/scheduler-plugins](https://github.com/kubernetes-sigs/scheduler-plugins) repository provides out-of-tree scheduler plugins based on the scheduler framework.

Starting with KubeRay v1.4.0, KubeRay integrates with the [PodGroup API](https://github.com/kubernetes-sigs/scheduler-plugins/blob/93126eabdf526010bf697d5963d849eab7e8e898/site/content/en/docs/plugins/coscheduling.md) provided by scheduler plugins to support gang scheduling for RayCluster custom resources.

## Step 1: Create a Kubernetes cluster with Kind

```sh
kind create cluster --image=kindest/node:v1.26.0
```

## Step 2: Install scheduler plugins

Follow the [installation guide](https://scheduler-plugins.sigs.k8s.io/docs/user-guide/installation/) in the scheduler-plugins repository to install the scheduler plugins.

:::{note}

There are two modes for installing the scheduler plugins: *single scheduler mode* and *second scheduler mode*.

KubeRay v1.4.0 only supports the *single scheduler mode*.
You need to have the access to configure Kubernetes control plane to replace the default scheduler with the scheduler plugins.

:::

## Step 3: Install KubeRay operator with scheduler plugins enabled

KubeRay v1.4.0 and later versions support scheduler plugins.

```sh
helm install kuberay-operator kuberay/kuberay-operator --version 1.4.0 --set batchScheduler.name=scheduler-plugins
```

## Step 4: Deploy a RayCluster with gang scheduling

```sh
# Configure the RayCluster with label `ray.io/gang-scheduling-enabled: "true"`
# to enable gang scheduling.
kubectl apply -f https://raw.githubusercontent.com/ray-project/kuberay/release-1.4/ray-operator/config/samples/ray-cluster.scheduler-plugins.yaml
```

## Step 5: Verify Ray Pods and PodGroup

Note that if you use "second scheduler mode," which KubeRay currently doesn't support, the following commands still show similar results.
However, the Ray Pods don't get scheduled in a gang scheduling manner.
Make sure to use "single scheduler mode" to enable gang scheduling.

```sh
kubectl get podgroups.scheduling.x-k8s.io
# NAME PHASE MINMEMBER RUNNING SUCCEEDED FAILED AGE
# test-podgroup-0 Running 3 3 2m25s

# All Ray Pods (1 head and 2 workers) belong to the same PodGroup.
kubectl get pods -L scheduling.x-k8s.io/pod-group
# NAME READY STATUS RESTARTS AGE POD-GROUP
# test-podgroup-0-head 1/1 Running 0 3m30s test-podgroup-0
# test-podgroup-0-worker-worker-4vc6j 1/1 Running 0 3m30s test-podgroup-0
# test-podgroup-0-worker-worker-ntm9f 1/1 Running 0 3m30s test-podgroup-0
```
10 changes: 10 additions & 0 deletions doc/source/serve/production-guide/fault-tolerance.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ You can also use the deployment options to customize how frequently Serve runs t
:language: python
```

In this example, `check_health` raises an error if the connection to an external database is lost. The Serve controller periodically calls this method on each replica of the deployment. If the method raises an exception for a replica, Serve marks that replica as unhealthy and restarts it. Health checks are configured and performed on a per-replica basis.

:::{note}
You shouldn't call ``check_health`` directly through a deployment handle (e.g., ``await deployment_handle.check_health.remote()``). This would invoke the health check on a single, arbitrary replica. The ``check_health`` method is designed as an interface for the Serve controller, not for direct user calls.
:::

:::{note}
In a composable deployment graph, each deployment is responsible for its own health, independent of the other deployments it's bound to. For example, in an application defined by ``app = ParentDeployment.bind(ChildDeployment.bind())``, ``ParentDeployment`` doesn't restart if ``ChildDeployment`` replicas fail their health checks. When the ``ChildDeployment`` replicas recover, the handle in ``ParentDeployment`` updates automatically to route requests to the healthy replicas.
:::

### Worker node recovery

:::{admonition} KubeRay Required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
},
"outputs": [],
"source": [
"%%bash\n",
"pip install torch torchvision"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ In this step you train a PyTorch VisionTransformer model to recognize objects us
First, install and import the required Python modules.


```python
```bash
%%bash
pip install torch torchvision
```

Expand Down
40 changes: 36 additions & 4 deletions python/ray/_private/telemetry/open_telemetry_metric_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,46 @@ def callback(options):
callbacks=[callback],
)
self._registered_instruments[name] = instrument
self._observations_by_name[name] = {}

def register_counter_metric(self, name: str, description: str) -> None:
"""
Register a counter metric with the given name and description.
"""
with self._lock:
if name in self._registered_instruments:
# Counter with the same name is already registered.
return

instrument = self.meter.create_counter(
name=f"{NAMESPACE}_{name}",
description=description,
unit="1",
)
self._registered_instruments[name] = instrument

def set_metric_value(self, name: str, tags: dict, value: float):
"""
Set the value of a metric with the given name and tags.
This will create a gauge if it does not exist.
Set the value of a metric with the given name and tags. If the metric is not
registered, it lazily records the value for observable metrics or is a no-op for
synchronous metrics.
"""
with self._lock:
self._observations_by_name[name][frozenset(tags.items())] = value
if self._observations_by_name.get(name) is not None:
# Set the value of an observable metric with the given name and tags. It
# lazily records the metric value by storing it in a dictionary until
# the value actually gets exported by OpenTelemetry.
self._observations_by_name[name][frozenset(tags.items())] = value
else:
# Set the value of a synchronous metric with the given name and tags.
# It is a no-op if the metric is not registered.
instrument = self._registered_instruments.get(name)
if isinstance(instrument, metrics.Counter):
instrument.add(value, attributes=tags)
else:
logger.warning(
f"Unsupported synchronous instrument type for metric: {name}."
)

def record_and_export(self, records: List[Record], global_tags=None):
"""
Expand All @@ -84,7 +116,7 @@ def record_and_export(self, records: List[Record], global_tags=None):
f"Failed to record metric {gauge.name} with value {value} with tags {tags!r} and global tags {global_tags!r} due to: {e!r}"
)

def _get_metric_value(self, name: str, tags: dict) -> Optional[float]:
def _get_observable_metric_value(self, name: str, tags: dict) -> Optional[float]:
"""
Get the value of a metric with the given name and tags. This method is mainly
used for testing purposes.
Expand Down
18 changes: 14 additions & 4 deletions python/ray/dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,20 @@ async def Export(
for resource_metrics in request.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
self._open_telemetry_metric_recorder.register_gauge_metric(
metric.name, metric.description or ""
)
for data_point in metric.gauge.data_points:
data_points = []
# gauge metrics
if metric.WhichOneof("data") == "gauge":
self._open_telemetry_metric_recorder.register_gauge_metric(
metric.name, metric.description or ""
)
data_points = metric.gauge.data_points
# counter metrics
if metric.WhichOneof("data") == "sum" and metric.sum.is_monotonic:
self._open_telemetry_metric_recorder.register_counter_metric(
metric.name, metric.description or ""
)
data_points = metric.sum.data_points
for data_point in data_points:
self._open_telemetry_metric_recorder.set_metric_value(
metric.name,
{
Expand Down
Loading