diff --git a/python/xoscar/backends/indigen/tests/test_pool.py b/python/xoscar/backends/indigen/tests/test_pool.py index 96881671..f83509d4 100644 --- a/python/xoscar/backends/indigen/tests/test_pool.py +++ b/python/xoscar/backends/indigen/tests/test_pool.py @@ -542,6 +542,135 @@ async def test_create_actor_pool(): assert len(global_router._mapping) == 0 +@pytest.mark.asyncio +async def test_create_actor_pool_elastic_ip(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + addr = f"111.111.111.111:{get_next_port()}" + pool = await create_actor_pool( + addr, + pool_cls=MainActorPool, + n_process=0, + subprocess_start_method=start_method, + extra_conf={"listen_elastic_ip": True}, + ) + async with pool: + # test global router + global_router = Router.get_instance() + # global router should not be the identical one with pool's router + assert global_router is not pool.router + assert pool.external_address in global_router._curr_external_addresses + assert pool.external_address in global_router._mapping + assert pool.external_address == addr + + ctx = get_context() + + # actor on main pool + actor_ref = await ctx.create_actor( + TestActor, uid="test-1", address=pool.external_address + ) + assert await actor_ref.add(3) == 3 + await ctx.destroy_actor(actor_ref) + assert (await ctx.has_actor(actor_ref)) is False + + assert pool.stopped + # after pool shutdown, global router must has been cleaned + global_router = Router.get_instance() + assert len(global_router._curr_external_addresses) == 0 + assert len(global_router._mapping) == 0 + + +@pytest.mark.asyncio +async def test_create_actor_pool_fix_all_zero_ip(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + port = get_next_port() + addr = f"0.0.0.0:{port}" + pool = await create_actor_pool( + addr, + pool_cls=MainActorPool, + n_process=0, + subprocess_start_method=start_method, + ) + async with pool: + # test global router + global_router = Router.get_instance() + # global router should not be the identical one with pool's router + assert global_router is not pool.router + assert pool.external_address in global_router._curr_external_addresses + assert pool.external_address in global_router._mapping + assert pool.external_address == addr + + ctx = get_context() + + # actor on main pool + actor_ref = await ctx.create_actor( + TestActor, uid="test-1", address=pool.external_address + ) + assert await actor_ref.add(3) == 3 + connect_addr = f"127.0.0.1:{port}" + actor_ref2 = await create_actor_ref(connect_addr, uid="test-1") + # test fix_all_zero_ip, the result is not 0.0.0.0 + assert actor_ref2.address == connect_addr + assert await actor_ref.add(3) == 6 + + await ctx.destroy_actor(actor_ref) + assert (await ctx.has_actor(actor_ref)) is False + + assert pool.stopped + # after pool shutdown, global router must has been cleaned + global_router = Router.get_instance() + assert len(global_router._curr_external_addresses) == 0 + assert len(global_router._mapping) == 0 + + +@pytest.mark.asyncio +async def test_create_actor_pool_ipv6(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + port = get_next_port() + addr = f":::{port}" + pool = await create_actor_pool( + addr, + pool_cls=MainActorPool, + n_process=0, + subprocess_start_method=start_method, + ) + async with pool: + # test global router + global_router = Router.get_instance() + # global router should not be the identical one with pool's router + assert global_router is not pool.router + assert pool.external_address in global_router._curr_external_addresses + assert pool.external_address in global_router._mapping + assert pool.external_address == addr + + ctx = get_context() + + # actor on main pool + actor_ref = await ctx.create_actor( + TestActor, uid="test-1", address=pool.external_address + ) + assert await actor_ref.add(3) == 3 + await ctx.destroy_actor(actor_ref) + assert (await ctx.has_actor(actor_ref)) is False + + assert pool.stopped + # after pool shutdown, global router must has been cleaned + global_router = Router.get_instance() + assert len(global_router._curr_external_addresses) == 0 + assert len(global_router._mapping) == 0 + + @pytest.mark.asyncio async def test_errors(): with pytest.raises(ValueError): @@ -1010,6 +1139,37 @@ async def test_ucx(enable_internal_addr: bool): assert await ref1.foo(ref2, 3) == 6 +@require_ucx +@pytest.mark.asyncio +async def test_ucx_elastic_ip(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + port = get_next_port() + pool = await create_actor_pool( # type: ignore + f"111.111.111.111:{get_next_port()}", + pool_cls=MainActorPool, + n_process=0, + subprocess_start_method=start_method, + external_address_schemes=["ucx"], + extra_conf={"listen_elastic_ip": True}, + ) + + async with pool: + ctx = get_context() + ref1 = await ctx.create_actor( + TestUCXActor, + 0, + address=pool.external_address, + allocate_strategy=ProcessIndex(0), + ) + assert await ref1.add(1) == 1 + ref2 = await create_actor_ref(f"ucx://127.0.0.1:{port}", uid="") + assert await ref2.add(1) == 2 + + @pytest.mark.asyncio async def test_append_sub_pool(): start_method = (