Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/running-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ to localhost)

Optionally used together with ``--worker`` to set the port number of the master node (defaults to 5557).

``--master-ipv4-only``
----------------------

Optionally used together with ``--worker`` or ``--master`` to force the use of IPv4 (defaults to preferring Ipv6)

.. note::
If a hostname is provided in ``--msater-host`` and this can be resolved via getaddrinfo to an IPv6 address
this will be preferred by default. Some environments might have an IPv6 hosts file entry in place which
doesn't work but is difficult to remove so this allows you to pick.


``--master-bind-host <ip>``
---------------------------

Expand Down
22 changes: 19 additions & 3 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ def get_empty_argument_parser(add_help=True, default_config_files=DEFAULT_CONFIG
return parser


def download_locustfile_from_master(master_host: str, master_port: int) -> str:
def download_locustfile_from_master(master_host: str, master_port: int, ipv4_only: bool) -> str:
client_id = socket.gethostname() + "_download_locustfile_" + uuid4().hex
tempclient = zmqrpc.Client(master_host, master_port, client_id)
tempclient = zmqrpc.Client(master_host, master_port, client_id, ipv4_only)
got_reply = False

def ask_for_locustfile():
Expand Down Expand Up @@ -338,6 +338,13 @@ def parse_locustfile_option(args=None) -> tuple[argparse.Namespace, list[str]]:
default=5557,
env_var="LOCUST_MASTER_NODE_PORT",
)
parser.add_argument(
"--master-ipv4-only",
action="store_true",
default=False,
help="Only use IPv4 when connecting to the master node",
env_var="LOCUST_MASTER_NODE_IPV4_ONLY",
)

options, unknown = parser.parse_known_args(args=args)

Expand Down Expand Up @@ -382,7 +389,9 @@ def retrieve_locustfiles_from_master(options) -> list[str]:
)
sys.exit(1)
# having this in argument_parser module is a bit weird, but it needs to be done early
locustfile_sources = download_locustfile_from_master(options.master_host, options.master_port)
locustfile_sources = download_locustfile_from_master(
options.master_host, options.master_port, options.master_ipv4_only
)
return parse_locustfiles_from_master(locustfile_sources)


Expand Down Expand Up @@ -702,6 +711,13 @@ def setup_parser_arguments(parser):
help="Port to connect to on master node. Defaults to 5557.",
env_var="LOCUST_MASTER_NODE_PORT",
)
worker_group.add_argument(
"--master-ipv4-only",
action="store_true",
default=False,
help="Only use IPv4 when connecting to the master node",
env_var="LOCUST_MASTER_NODE_IPV4_ONLY",
)

web_ui_group.add_argument(
"--web-base-path",
Expand Down
9 changes: 7 additions & 2 deletions locust/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ def create_local_runner(self) -> LocalRunner:
"""
return self._create_runner(LocalRunner)

def create_master_runner(self, master_bind_host="*", master_bind_port=5557) -> MasterRunner:
def create_master_runner(
self, master_bind_host: str = "*", master_bind_port: int = 5557, master_ipv4_only: bool = False
) -> MasterRunner:
"""
Create a :class:`MasterRunner <locust.runners.MasterRunner>` instance for this Environment

Expand All @@ -147,14 +149,16 @@ def create_master_runner(self, master_bind_host="*", master_bind_port=5557) -> M
MasterRunner,
master_bind_host=master_bind_host,
master_bind_port=master_bind_port,
master_ipv4_only=master_ipv4_only,
)

def create_worker_runner(self, master_host: str, master_port: int) -> WorkerRunner:
def create_worker_runner(self, master_host: str, master_port: int, master_ipv4_only: bool = False) -> WorkerRunner:
"""
Create a :class:`WorkerRunner <locust.runners.WorkerRunner>` instance for this Environment

:param master_host: Host/IP of a running master node
:param master_port: Port on master node to connect to
:param master_ipv4_only: Only use ipv4 when connecting to master
"""
# Create a new RequestStats with use_response_times_cache set to False to save some memory
# and CPU cycles, since the response_times_cache is not needed for Worker nodes
Expand All @@ -163,6 +167,7 @@ def create_worker_runner(self, master_host: str, master_port: int) -> WorkerRunn
WorkerRunner,
master_host=master_host,
master_port=master_port,
master_ipv4_only=master_ipv4_only,
)

def create_web_ui(
Expand Down
5 changes: 4 additions & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,13 @@ def kill_workers(children):
runner = environment.create_master_runner(
master_bind_host=options.master_bind_host,
master_bind_port=options.master_bind_port,
master_ipv4_only=options.master_ipv4_only,
)
elif options.worker:
try:
runner = environment.create_worker_runner(options.master_host, options.master_port)
runner = environment.create_worker_runner(
options.master_host, options.master_port, options.master_ipv4_only
)
logger.debug(
"Connected to locust master: %s:%s%s", options.master_host, options.master_port, options.web_base_path
)
Expand Down
12 changes: 7 additions & 5 deletions locust/rpc/zmqrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def recv_from_client(self):
def close(self, linger=None):
self.socket.close(linger=linger)

def ipv4_only(self, host, port) -> bool:
def ipv4_only(self, host, port, force=False) -> bool:
if force:
return True
try:
if host == "*":
return False
Expand All @@ -75,8 +77,8 @@ def ipv4_only(self, host, port) -> bool:


class Server(BaseSocket):
def __init__(self, host, port):
BaseSocket.__init__(self, zmq.ROUTER, self.ipv4_only(host, port))
def __init__(self, host, port, ipv4_only):
BaseSocket.__init__(self, zmq.ROUTER, self.ipv4_only(host, port, ipv4_only))
if port == 0:
self.port = self.socket.bind_to_random_port(f"tcp://{host}")
else:
Expand All @@ -88,7 +90,7 @@ def __init__(self, host, port):


class Client(BaseSocket):
def __init__(self, host, port, identity):
BaseSocket.__init__(self, zmq.DEALER, self.ipv4_only(host, port))
def __init__(self, host, port, identity, ipv4_only):
BaseSocket.__init__(self, zmq.DEALER, self.ipv4_only(host, port, ipv4_only))
self.socket.setsockopt(zmq.IDENTITY, identity.encode())
self.socket.connect("tcp://%s:%i" % (host, port))
14 changes: 8 additions & 6 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class MasterRunner(DistributedRunner):
:class:`WorkerRunners <WorkerRunner>` will aggregated.
"""

def __init__(self, environment, master_bind_host, master_bind_port) -> None:
def __init__(self, environment, master_bind_host, master_bind_port, master_ipv4_only) -> None:
"""
:param environment: Environment instance
:param master_bind_host: Host/interface to use for incoming worker connections
Expand All @@ -651,14 +651,15 @@ def __init__(self, environment, master_bind_host, master_bind_port) -> None:
self.worker_cpu_warning_emitted = False
self.master_bind_host = master_bind_host
self.master_bind_port = master_bind_port
self.master_ipv4_only = master_ipv4_only
self.spawn_rate: float = 0.0
self.spawning_completed = False
self.worker_indexes: dict[str, int] = {}
self.worker_index_max = 0

self.clients = WorkerNodes()
try:
self.server = rpc.Server(master_bind_host, master_bind_port)
self.server = rpc.Server(master_bind_host, master_bind_port, master_ipv4_only)
except RPCError as e:
if e.args[0] == "Socket bind failure: Address already in use":
port_string = (
Expand Down Expand Up @@ -951,7 +952,7 @@ def reset_connection(self) -> None:
logger.info("Resetting RPC server and all worker connections.")
try:
self.server.close(linger=0)
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.server = rpc.Server(self.master_bind_host, self.master_bind_port, self.master_ipv4_only)
self.connection_broken = False
except RPCError as e:
logger.error(f"Temporary failure when resetting connection: {e}, will retry later.")
Expand Down Expand Up @@ -1201,7 +1202,7 @@ class WorkerRunner(DistributedRunner):
# the worker index is set on ACK, if master provided it (masters <= 2.10.2 do not provide it)
worker_index = -1

def __init__(self, environment: Environment, master_host: str, master_port: int) -> None:
def __init__(self, environment: Environment, master_host: str, master_port: int, master_ipv4_only: bool) -> None:
"""
:param environment: Environment instance
:param master_host: Host/IP to use for connection to the master
Expand All @@ -1216,11 +1217,12 @@ def __init__(self, environment: Environment, master_host: str, master_port: int)
self.client_id = socket.gethostname() + "_" + uuid4().hex
self.master_host = master_host
self.master_port = master_port
self.master_ipv4_only = master_ipv4_only
self.web_base_path = environment.parsed_options.web_base_path if environment.parsed_options else ""
self.logs: list[str] = []
self.worker_cpu_warning_emitted = False
self._users_dispatcher: UsersDispatcher | None = None
self.client = rpc.Client(master_host, master_port, self.client_id)
self.client = rpc.Client(master_host, master_port, self.client_id, self.master_ipv4_only)
self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)
self.connect_to_master()
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
Expand Down Expand Up @@ -1324,7 +1326,7 @@ def reset_connection(self) -> None:
logger.info("Reset connection to master")
try:
self.client.close()
self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
self.client = rpc.Client(self.master_host, self.master_port, self.client_id, self.master_ipv4_only)
except RPCError as e:
logger.error(f"Temporary failure when resetting connection: {e}, will retry later.")

Expand Down
2 changes: 1 addition & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3436,7 +3436,7 @@ def get_runner(self, client, environment=None, user_classes=None, auto_connect=T
environment = self.environment
user_classes = user_classes or []
environment.user_classes = user_classes
return WorkerRunner(environment, master_host="localhost", master_port=5557)
return WorkerRunner(environment, master_host="localhost", master_port=5557, master_ipv4_only=False)

def test_worker_stop_timeout(self):
class MyTestUser(User):
Expand Down
10 changes: 5 additions & 5 deletions locust/test/test_zmqrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
class ZMQRPC_tests(LocustTestCase):
def setUp(self):
super().setUp()
self.server = zmqrpc.Server("*", 0)
self.client = zmqrpc.Client("localhost", self.server.port, "identity")
self.server = zmqrpc.Server("*", 0, False)
self.client = zmqrpc.Client("localhost", self.server.port, "identity", False)

def tearDown(self):
self.server.close()
Expand Down Expand Up @@ -44,15 +44,15 @@ def test_client_recv(self):
self.assertEqual(msg.node_id, "identity")

def test_client_retry(self):
server = zmqrpc.Server("*", 0)
server = zmqrpc.Server("*", 0, False)
server.socket.close()
with self.assertRaises(RPCError):
server.recv_from_client()

def test_rpc_error(self):
server = zmqrpc.Server("*", 0)
server = zmqrpc.Server("*", 0, False)
with self.assertRaises(RPCError):
server = zmqrpc.Server("*", server.port)
server = zmqrpc.Server("*", server.port, False)
server.close()
with self.assertRaises(RPCSendError):
server.send_to_client(Message("test", "message", "identity"))
Loading