From 127fab2473fd3955f549932252689d2a3bf91f48 Mon Sep 17 00:00:00 2001 From: plan Date: Mon, 15 Jul 2024 02:11:38 +0800 Subject: [PATCH] create_actor_pool: Add extra_conf option listen_elastic_ip Usage: create_actor_pool(elastic_address, n_process=0, extra_conf={'listen_elastic_ip': True}, ) While xinference worker serve on cloud elastic_ip, the address used in create_actor_pool() and create_actor both have to be the elastic ip, in order for ActorRef passing around RPC method to client, but we could only listen on 0.0.0.0. (Because the ip is not only valid outside the host) --- python/xoscar/backends/communication/socket.py | 17 +++++++++++++++-- python/xoscar/backends/communication/ucx.py | 13 +++++++++++-- python/xoscar/utils.py | 5 +++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index 4fa7deba..92c18601 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -30,7 +30,7 @@ from ..._utils import to_binary from ...constants import XOSCAR_UNIX_SOCKET_DIR from ...serialization import AioDeserializer, AioSerializer, deserialize -from ...utils import classproperty, implements +from ...utils import classproperty, implements, is_v6_ip from .base import Channel, ChannelType, Client, Server from .core import register_client, register_server from .utils import read_buffers, write_buffers @@ -201,6 +201,10 @@ def client_type(self) -> Type["Client"]: def channel_type(self) -> int: return ChannelType.remote + @classmethod + def parse_config(cls, config: dict) -> dict: + return config + @staticmethod @implements(Server.create) async def create(config: Dict) -> "Server": @@ -212,6 +216,15 @@ async def create(config: Dict) -> "Server": else: host = config.pop("host") port = int(config.pop("port")) + # The Actor.address is not on our host, cannot actually listen on it. + # But we have to keep it for announcement to client.s + _host = host + if config.pop("listen_elastic_ip", False): + if is_v6_ip(host): + _host = "::" + else: + _host = "0.0.0.0" + handle_channel = config.pop("handle_channel") if "start_serving" not in config: config["start_serving"] = False @@ -224,7 +237,7 @@ async def handle_connection(reader: StreamReader, writer: StreamWriter): port = port if port != 0 else None aio_server = await asyncio.start_server( - handle_connection, host=host, port=port, **config + handle_connection, host=_host, port=port, **config ) # get port of the socket if not specified diff --git a/python/xoscar/backends/communication/ucx.py b/python/xoscar/backends/communication/ucx.py index 360683d6..fa36d21c 100644 --- a/python/xoscar/backends/communication/ucx.py +++ b/python/xoscar/backends/communication/ucx.py @@ -28,7 +28,7 @@ from ...nvutils import get_cuda_context, get_index_and_uuid from ...serialization import deserialize from ...serialization.aio import BUFFER_SIZES_NAME, AioSerializer, get_header_length -from ...utils import classproperty, implements, is_cuda_buffer, lazy_import +from ...utils import classproperty, implements, is_cuda_buffer, is_v6_ip, lazy_import from ..message import _MessageBase from .base import Channel, ChannelType, Client, Server from .core import register_client, register_server @@ -406,6 +406,15 @@ async def create(config: Dict) -> "Server": else: host = config.pop("host") port = int(config.pop("port")) + # The Actor.address is not on our host, cannot actually listen on it. + # But we have to keep it for announcement to client.s + _host = host + if config.pop("listen_elastic_ip", False): + if is_v6_ip(host): + _host = "::" + else: + _host = "0.0.0.0" + handle_channel = config.pop("handle_channel") # init @@ -414,7 +423,7 @@ async def create(config: Dict) -> "Server": async def serve_forever(client_ucp_endpoint: "ucp.Endpoint"): # type: ignore try: await server.on_connected( - client_ucp_endpoint, local_address=server.address + client_ucp_endpoint, local_address="%s:%d" % (_host, port) ) except ChannelClosed: # pragma: no cover logger.exception("Connection closed before handshake completed") diff --git a/python/xoscar/utils.py b/python/xoscar/utils.py index 9438b873..643c065d 100644 --- a/python/xoscar/utils.py +++ b/python/xoscar/utils.py @@ -480,6 +480,11 @@ def is_v6_zero_ip(ip_port_addr: str) -> bool: return True +def is_v6_ip(ip_port_addr: str) -> bool: + arr = ip_port_addr.split("://", 1)[-1].split(":") + return len(arr) > 1 + + def fix_all_zero_ip(remote_addr: str, connect_addr: str) -> str: """ Use connect_addr to fix ActorRef.address return by remote server.