Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,27 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------------------

Code in python/ray/_private/net.py IPv6 support provided by

Copyright 2024 Sam Gleske https://github.com/samrocketman

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 2 additions & 1 deletion doc/source/ray-core/examples/lm/ray_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from fairseq_cli.train import main

import ray
from ray._private import net

_original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint

Expand Down Expand Up @@ -78,7 +79,7 @@ def get_node_ip(self):

def find_free_port(self):
"""Finds a free port on the current node."""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
with closing(net._get_socket_dualstack_fallback_single_stack_laddr()) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.google.common.base.Preconditions;
import io.ray.runtime.config.RayConfig;
import java.net.UnknownHostException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Options to create GCS Client. */
public class GcsClientOptions {
Expand All @@ -10,8 +13,30 @@ public class GcsClientOptions {
public String username;
public String password;

// writing as a function in case code reuse is needed later
private static String[] getIpAndPortForAddress(String address) throws UnknownHostException {
// Similar to python/ray/_private/net.py
// do not attempt to parse if not ip:port or url-ish (*://ip:port/*) with
// ip:port for DNS, IPv4, and IPv6 supported
Pattern pattern = Pattern.compile("^(.*/)?\\[?(.*[^:\\]])\\]?:([0-9]+)(/.*)?$");
Matcher matcher = pattern.matcher(address);
if(!matcher.matches()) {
throw new UnknownHostException("Must be ip:port or url-ish (*://ip:port/*)");
}
String ip = matcher.group(2);
String port = matcher.group(3);
String[] ipAndPort = new String[]{ip, port};
return ipAndPort;
}

public GcsClientOptions(RayConfig rayConfig) {
String[] ipAndPort = rayConfig.getBootstrapAddress().split(":");
String[] ipAndPort;
try {
ipAndPort = getIpAndPortForAddress(rayConfig.getBootstrapAddress());
} catch(UnknownHostException ignored) {
// fall back to old behavior
ipAndPort = rayConfig.getBootstrapAddress().split(":");
}
Preconditions.checkArgument(ipAndPort.length == 2, "Invalid bootstrap address.");
ip = ipAndPort[0];
port = Integer.parseInt(ipAndPort[1]);
Expand Down
175 changes: 175 additions & 0 deletions python/ray/_private/net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#
# This is supposed to centralize all internal ray networking construction so
# that we can more easily support IPv6 along with IPv4.
#
import re
import socket
from contextlib import closing


def _get_addrinfo_from_sock_kind(address, kind, family=0):
"""
Get a list of valid IP addresses compatible with requested filters.

kind is socket kind such as socket.SOCK_STREAM or socket.SOCK_DGRAM family
is INET address family. 0 is both IPv4 and IPv6. socket.AF_INET is IPv4.
socket.AF_INET6 is IPv6

Returns a List of tuples with (family, ip) as items.

Example return value for
_get_addrinfo_from_sock_kind("localhost", socket.SOCK_STREAM):

[(socket.AF_INET6, "::1"), (socket.AF_INET, "127.0.0.1")]
"""
return [
(x[0], x[-1][0])
for x in socket.getaddrinfo(address, None, family)
if x[1] == kind
]


def _get_addrinfo_from_sock_kind_ipv4_fallback_ipv6(address, kind):
"""
Same as _get_sock_kind_from_addrinfo() but favors IPv4 if it is available.

If IPv4 is not available it will fall back to searching for IPv6.
"""
inet_addresses = []
try:
inet_addresses = _get_addrinfo_from_sock_kind(address, kind, socket.AF_INET)
except Exception as ignored: # noqa: F841
pass
if len(inet_addresses) > 0:
return inet_addresses
else:
# this call expands to both IPv4 and IPv6 addresses for the given host
return _get_addrinfo_from_sock_kind(address, kind)


def _get_sock_from_host(address, kind):
"""
Get a socket.socket for a provided inet address.

Favor IPv4 but expand to IPv6 if IPv4 not available.
"""
try:
# obtain the first valid address for use
# inet_address is a tuple with (socket.AF_INET or socket.AF_INET6, ip_address)
inet_address = _get_addrinfo_from_sock_kind_ipv4_fallback_ipv6(address, kind)[0]
except socket.gaierror as e:
# This is an undesirable workaround for some existing tests.
raise ValueError(f"Address invalid or not reachable: {address}\n\n" + repr(e))
return socket.socket(inet_address[0], kind)


def _get_sock_stream_from_host(address):
"""
Returns a socket.socket which can either be IPv4 or IPv6 with
socket.SOCK_STREAM.

Favor IPv4 but expand to IPv6 if IPv4 not available.
"""
return _get_sock_from_host(address, socket.SOCK_STREAM)


def _get_sock_dgram_from_host(address):
"""
Returns a socket.socket which can either be IPv4 or IPv6 with
socket.SOCK_DGRAM.

Favor IPv4 but expand to IPv6 if IPv4 not available.
"""
return _get_sock_from_host(address, socket.SOCK_DGRAM)


def _get_private_ip_addresses():
"""
Connects to Google DNS over IPv4, IPv6, or both to discover private IP
addresses.
"""
private_ip_addresses = []
for public_host in ["2001:4860:4860::8888", "8.8.8.8"]:
if ":" in public_host and not socket.has_ipv6:
continue
try:
with closing(_get_sock_dgram_from_host(public_host)) as s:
# IPv4 may not be available so skip
if ":" not in public_host and not s.family == socket.AF_INET:
continue
s.connect((public_host, 80))
# get the local iface IP used to connect against the public_host
private_ip_addresses.append(s.getsockname()[0])
except Exception as ignored: # noqa: F841
pass
return private_ip_addresses


def _get_socket_dualstack_fallback_single_stack_laddr(kind=socket.SOCK_STREAM):
"""
Similar to socket.socket() except that if networking is dualstack it will
listen on all address family interfaces.

If single-stack, then the loopback interface is queried for which stack is
allowed (IPv4 or IPv6).
"""
if socket.has_dualstack_ipv6():
sock = socket.socket(socket.AF_INET6, kind)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
# set up to listen on all IPv4 and IPv6 interfaces with a single socket
return sock
else:
# fall back to supporting the host single network stack.
# (IPv4 or IPv6 only)
return _get_sock_from_host("localhost", kind)


def _parse_ip_port(address):
"""
Parses a str of ip:port and returns a List with (ip, port).

This uses str.rsplit(":", 1) to support DNS, IPv6, and IPv4 parsing.
"""
# dashboard passes bytes instead of str so coerce
if isinstance(address, bytes):
address = address.decode("ascii")

if not isinstance(address, str):
raise ValueError(
"address type not str or bytes:\n\n"
+ f" type: {repr(type(address))}\n"
+ f" value: {repr(address)}"
)

# If no colon is in the address then assume a List with the original value
# is desired.
#
# This is necessary, for example, in
# ray._private.services.resolve_ip_for_localhost()
#
# Note: this does not handle IPv6 well at all. For example parsing will
# fail with ::1 instead of 127.0.0.1. For now, I don't think this
# needs to be handled. From after this conditional, IPv6 will always
# be required to have ip:port or it will raise a ValueError.
if ":" not in address:
return [address]

# do not attempt to parse if not ip:port or url-ish (*://ip:port/*) with
# ip:port for DNS, IPv4, and IPv6 supported
pattern = re.compile("^(.*/)?.*[^:]:[0-9]+(/.*)?$")
if not pattern.match(address):
raise ValueError(
"Malformed address (expected address to "
+ "include IP or DNS host and port "
+ f"e.g. ip:port): {address}"
)
# use rsplit for IPv6 or IPv4
ip, port = address.rsplit(":", 1)
# check for url-ish
if "/" in address:
if "/" in ip:
ip = re.sub(r"^.*/", r"", ip)
if "/" in port:
port = re.sub(r"^([^/]+)/.*", r"\g<1>", port)
ip = re.sub(r"^\[?([^\]]*)\]?$", r"\g<1>", ip)
return [ip, port]
9 changes: 4 additions & 5 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ray
import ray._private.ray_constants as ray_constants
import ray._private.services
from ray._private import storage
from ray._private import net, storage
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
from ray._private.services import serialize_config, get_address
Expand Down Expand Up @@ -390,7 +390,7 @@ def check_persisted_session_name(self):
@staticmethod
def validate_ip_port(ip_port):
"""Validates the address is in the ip:port format"""
_, _, port = ip_port.rpartition(":")
_, port = net._parse_ip_port(ip_port)
if port == ip_port:
raise ValueError(f"Port is not specified for address {ip_port}")
try:
Expand Down Expand Up @@ -913,8 +913,7 @@ def _get_log_file_name(
def _get_unused_port(self, allocated_ports=None):
if allocated_ports is None:
allocated_ports = set()

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = net._get_socket_dualstack_fallback_single_stack_laddr()
s.bind(("", 0))
port = s.getsockname()[1]

Expand All @@ -927,7 +926,7 @@ def _get_unused_port(self, allocated_ports=None):
# This port is allocated for other usage already,
# so we shouldn't use it even if it's not in use right now.
continue
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_s = net._get_socket_dualstack_fallback_single_stack_laddr()
try:
new_s.bind(("", new_port))
except OSError:
Expand Down
28 changes: 17 additions & 11 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import ray._private.ray_constants as ray_constants
from ray._raylet import GcsClient, GcsClientOptions
from ray.core.generated.common_pb2 import Language
from ray._private import net
from ray._private.ray_constants import RAY_NODE_IP_FILENAME

resource = None
Expand Down Expand Up @@ -600,7 +601,7 @@ def extract_ip_port(bootstrap_address: str):
raise ValueError(
f"Malformed address {bootstrap_address}. " f"Expected '<host>:<port>'."
)
ip, _, port = bootstrap_address.rpartition(":")
ip, port = net._parse_ip_port(bootstrap_address)
try:
port = int(port)
except ValueError:
Expand All @@ -627,8 +628,13 @@ def resolve_ip_for_localhost(address: str):
"""
if not address:
raise ValueError(f"Malformed address: {address}")
address_parts = address.split(":")
if address_parts[0] == "127.0.0.1" or address_parts[0] == "localhost":
if address == "::1":
# edge-case of localhost IPv6 loopback not handled by
# net._parse_ip_port()
address_parts = [address]
else:
address_parts = net._parse_ip_port(address)
if address_parts[0] in ["127.0.0.1", "::1", "localhost"]:
# Make sure localhost isn't resolved to the loopback ip
ip_address = get_node_ip_address()
return ":".join([ip_address] + address_parts[1:])
Expand All @@ -646,8 +652,8 @@ def node_ip_address_from_perspective(address: str):
Returns:
The IP address by which the local node can be reached from the address.
"""
ip_address, port = address.split(":")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip_address, port = net._parse_ip_port(address)
s = net._get_sock_dgram_from_host(ip_address)
try:
# This command will raise an exception if there is no internet
# connection.
Expand All @@ -660,7 +666,9 @@ def node_ip_address_from_perspective(address: str):
try:
# try get node ip address from host name
host_name = socket.getfqdn(socket.gethostname())
node_ip_address = socket.gethostbyname(host_name)
node_ip_address = net._get_addrinfo_from_sock_kind_ipv4_fallback_ipv6(
host_name, socket.SOCK_DGRAM
)[0][1]
except Exception:
pass
finally:
Expand Down Expand Up @@ -1228,7 +1236,7 @@ def start_api_server(
port = ray_constants.DEFAULT_DASHBOARD_PORT
else:
port_retries = 0
port_test_socket = socket.socket()
port_test_socket = net._get_sock_stream_from_host(host)
port_test_socket.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
Expand Down Expand Up @@ -1426,17 +1434,15 @@ def read_log(filename, lines_to_read):
def get_address(redis_address):
parts = redis_address.split("://", 1)
enable_redis_ssl = False
if len(parts) == 1:
redis_ip_address, redis_port = parts[0].rsplit(":", 1)
else:
redis_ip_address, redis_port = net._parse_ip_port(redis_address)
if len(parts) > 1:
# rediss for SSL
if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
raise ValueError(
f"Invalid redis address {redis_address}."
"Expected format is ip:port or redis://ip:port, "
"or rediss://ip:port for SSL."
)
redis_ip_address, redis_port = parts[1].rsplit(":", 1)
if parts[0] == "rediss":
enable_redis_ssl = True
return redis_ip_address, redis_port, enable_redis_ssl
Expand Down
Loading