Skip to content

Commit d8b44ed

Browse files
committed
Add --listener-pool-klass, --acceptor-pool-klass, --threadless-pool-klass
1 parent f3d19ff commit d8b44ed

File tree

9 files changed

+104
-35
lines changed

9 files changed

+104
-35
lines changed
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `--listener-pool-klass`, `--acceptor-pool-klass`, `--threadless-pool-klass`

proxy/common/constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ def _env_threadless_compliant() -> bool:
114114
'{response_bytes} bytes - {connection_time_ms}ms'
115115
DEFAULT_REVERSE_PROXY_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \
116116
'{request_method} {request_path} -> {upstream_proxy_pass} - {connection_time_ms}ms'
117+
DEFAULT_LISTENER_POOL_KLASS = 'proxy.core.listener.ListenerPool'
118+
DEFAULT_ACCEPTOR_POOL_KLASS = 'proxy.core.acceptor.AcceptorPool'
117119
DEFAULT_NUM_ACCEPTORS = 0
118120
DEFAULT_NUM_WORKERS = 0
119121
DEFAULT_OPEN_FILE_LIMIT = 1024
@@ -127,6 +129,7 @@ def _env_threadless_compliant() -> bool:
127129
DEFAULT_STATIC_SERVER_DIR = os.path.join(PROXY_PY_DIR, "public")
128130
DEFAULT_MIN_COMPRESSION_LENGTH = 20 # In bytes
129131
DEFAULT_THREADLESS = _env_threadless_compliant()
132+
DEFAULT_THREADLESS_POOL_KLASS = 'proxy.core.work.ThreadlessPool'
130133
DEFAULT_LOCAL_EXECUTOR = True
131134
DEFAULT_TIMEOUT = 10.0
132135
DEFAULT_VERSION = False

proxy/common/flag.py

+21
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,24 @@ def initialize(
138138
if isinstance(work_klass, str) \
139139
else work_klass
140140

141+
# Load acceptor_pool_klass
142+
acceptor_pool_klass = opts.get('acceptor_pool_klass', args.acceptor_pool_klass)
143+
acceptor_pool_klass = Plugins.importer(bytes_(acceptor_pool_klass))[0] \
144+
if isinstance(acceptor_pool_klass, str) \
145+
else acceptor_pool_klass
146+
147+
# Load listener_pool_klass
148+
listener_pool_klass = opts.get('listener_pool_klass', args.listener_pool_klass)
149+
listener_pool_klass = Plugins.importer(bytes_(listener_pool_klass))[0] \
150+
if isinstance(listener_pool_klass, str) \
151+
else listener_pool_klass
152+
153+
# Load threadless_pool_klass
154+
threadless_pool_klass = opts.get('threadless_pool_klass', args.threadless_pool_klass)
155+
threadless_pool_klass = Plugins.importer(bytes_(threadless_pool_klass))[0] \
156+
if isinstance(threadless_pool_klass, str) \
157+
else threadless_pool_klass
158+
141159
# TODO: Plugin flag initialization logic must be moved within plugins.
142160
#
143161
# Generate auth_code required for basic authentication if enabled
@@ -201,6 +219,8 @@ def initialize(
201219
# def option(t: object, key: str, default: Any) -> Any:
202220
# return cast(t, opts.get(key, default))
203221
args.work_klass = work_klass
222+
args.acceptor_pool_klass = acceptor_pool_klass
223+
args.listener_pool_klass = listener_pool_klass
204224
args.plugins = plugins
205225
args.auth_code = cast(
206226
Optional[bytes],
@@ -376,6 +396,7 @@ def initialize(
376396
# evaluates to False.
377397
args.threadless = cast(bool, opts.get('threadless', args.threadless))
378398
args.threadless = is_threadless(args.threadless, args.threaded)
399+
args.threadless_pool_klass = threadless_pool_klass
379400

380401
args.pid_file = cast(
381402
Optional[str], opts.get(

proxy/common/plugins.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def locate_klass(klass_module_name: str, klass_path: List[str]) -> Union[type, N
111111
klass_container = getattr(klass_container, klass_path_part)
112112
except AttributeError:
113113
return None
114-
if not isinstance(klass_container, type) or not inspect.isclass(klass_container):
114+
if not callable(klass_container):
115115
return None
116116
return klass_container
117117

proxy/core/acceptor/pool.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from .acceptor import Acceptor
2525
from ..listener import ListenerPool
2626
from ...common.flag import flags
27-
from ...common.constants import DEFAULT_NUM_ACCEPTORS
27+
from ...common.constants import (
28+
DEFAULT_NUM_ACCEPTORS, DEFAULT_ACCEPTOR_POOL_KLASS,
29+
)
2830

2931

3032
if TYPE_CHECKING: # pragma: no cover
@@ -33,6 +35,14 @@
3335
logger = logging.getLogger(__name__)
3436

3537

38+
flags.add_argument(
39+
'--acceptor-pool-klass',
40+
type=str,
41+
default=DEFAULT_ACCEPTOR_POOL_KLASS,
42+
help='Default: ' + DEFAULT_ACCEPTOR_POOL_KLASS +
43+
'. Acceptor pool klass.',
44+
)
45+
3646
flags.add_argument(
3747
'--num-acceptors',
3848
type=int,

proxy/core/listener/pool.py

+11
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,23 @@
1313

1414
from .tcp import TcpSocketListener
1515
from .unix import UnixSocketListener
16+
from ...common.flag import flags
17+
from ...common.constants import DEFAULT_LISTENER_POOL_KLASS
1618

1719

1820
if TYPE_CHECKING: # pragma: no cover
1921
from .base import BaseListener
2022

2123

24+
flags.add_argument(
25+
'--listener-pool-klass',
26+
type=str,
27+
default=DEFAULT_LISTENER_POOL_KLASS,
28+
help='Default: ' + DEFAULT_LISTENER_POOL_KLASS +
29+
'. Listener pool klass.',
30+
)
31+
32+
2233
class ListenerPool:
2334
"""Provides abstraction around starting multiple listeners
2435
based upon flags."""

proxy/core/work/pool.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
from multiprocessing import connection
1616

1717
from ...common.flag import flags
18-
from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS
18+
from ...common.constants import (
19+
DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS_POOL_KLASS,
20+
)
1921

2022

2123
if TYPE_CHECKING: # pragma: no cover
@@ -54,6 +56,14 @@
5456
help='Defaults to number of CPU cores.',
5557
)
5658

59+
flags.add_argument(
60+
'--threadless-pool-klass',
61+
type=str,
62+
default=DEFAULT_THREADLESS_POOL_KLASS,
63+
help='Default: ' + DEFAULT_THREADLESS_POOL_KLASS +
64+
'. Threadless pool klass.',
65+
)
66+
5767

5868
class ThreadlessPool:
5969
"""Manages lifecycle of threadless pool and delegates work to them

proxy/proxy.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ def setup(self) -> None:
199199
self._write_pid_file()
200200
# We setup listeners first because of flags.port override
201201
# in case of ephemeral port being used
202-
self.listeners = ListenerPool(flags=self.flags)
202+
self.listeners = cast(
203+
'ListenerPool',
204+
self.flags.listener_pool_klass(flags=self.flags),
205+
)
203206
self.listeners.setup()
204207
# Override flags.port to match the actual port
205208
# we are listening upon. This is necessary to preserve
@@ -234,20 +237,26 @@ def setup(self) -> None:
234237
# Setup remote executors only if
235238
# --local-executor mode isn't enabled.
236239
if self.remote_executors_enabled:
237-
self.executors = ThreadlessPool(
238-
flags=self.flags,
239-
event_queue=event_queue,
240-
executor_klass=RemoteFdExecutor,
240+
self.executors = cast(
241+
'ThreadlessPool',
242+
self.flags.threadless_pool_klass(
243+
flags=self.flags,
244+
event_queue=event_queue,
245+
executor_klass=RemoteFdExecutor,
246+
),
241247
)
242248
self.executors.setup()
243249
# Setup acceptors
244-
self.acceptors = AcceptorPool(
245-
flags=self.flags,
246-
listeners=self.listeners,
247-
executor_queues=self.executors.work_queues if self.executors else [],
248-
executor_pids=self.executors.work_pids if self.executors else [],
249-
executor_locks=self.executors.work_locks if self.executors else [],
250-
event_queue=event_queue,
250+
self.acceptors = cast(
251+
'AcceptorPool',
252+
self.flags.acceptor_pool_klass(
253+
flags=self.flags,
254+
listeners=self.listeners,
255+
executor_queues=self.executors.work_queues if self.executors else [],
256+
executor_pids=self.executors.work_pids if self.executors else [],
257+
executor_locks=self.executors.work_locks if self.executors else [],
258+
event_queue=event_queue,
259+
),
251260
)
252261
self.acceptors.setup()
253262
# Start SSH tunnel acceptor if enabled

tests/test_main.py

+24-20
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232
DEFAULT_ENABLE_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL,
3333
DEFAULT_ENABLE_SSH_TUNNEL, DEFAULT_ENABLE_WEB_SERVER,
3434
DEFAULT_DISABLE_HTTP_PROXY, PLUGIN_WEBSOCKET_TRANSPORT,
35-
DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_CLIENT_RECVBUF_SIZE,
35+
DEFAULT_ACCEPTOR_POOL_KLASS, DEFAULT_CA_SIGNING_KEY_FILE,
36+
DEFAULT_CLIENT_RECVBUF_SIZE, DEFAULT_LISTENER_POOL_KLASS,
3637
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_CACHE_DIRECTORY_PATH,
3738
DEFAULT_ENABLE_REVERSE_PROXY, DEFAULT_ENABLE_STATIC_SERVER,
38-
_env_threadless_compliant,
39+
DEFAULT_THREADLESS_POOL_KLASS, _env_threadless_compliant,
3940
)
4041

4142

@@ -58,6 +59,8 @@ def mock_default_args(mock_args: mock.Mock) -> None:
5859
mock_args.basic_auth = DEFAULT_BASIC_AUTH
5960
mock_args.hostname = DEFAULT_IPV6_HOSTNAME
6061
mock_args.port = DEFAULT_PORT
62+
mock_args.listener_pool_klass = DEFAULT_LISTENER_POOL_KLASS
63+
mock_args.acceptor_pool_klass = DEFAULT_ACCEPTOR_POOL_KLASS
6164
mock_args.num_acceptors = DEFAULT_NUM_ACCEPTORS
6265
mock_args.num_workers = DEFAULT_NUM_WORKERS
6366
mock_args.disable_http_proxy = DEFAULT_DISABLE_HTTP_PROXY
@@ -71,6 +74,7 @@ def mock_default_args(mock_args: mock.Mock) -> None:
7174
mock_args.devtools_ws_path = DEFAULT_DEVTOOLS_WS_PATH
7275
mock_args.timeout = DEFAULT_TIMEOUT
7376
mock_args.threadless = DEFAULT_THREADLESS
77+
mock_args.threadless_pool_klass = DEFAULT_THREADLESS_POOL_KLASS
7478
mock_args.threaded = not DEFAULT_THREADLESS
7579
mock_args.enable_web_server = DEFAULT_ENABLE_WEB_SERVER
7680
mock_args.enable_static_server = DEFAULT_ENABLE_STATIC_SERVER
@@ -91,9 +95,9 @@ def mock_default_args(mock_args: mock.Mock) -> None:
9195
@mock.patch('time.sleep')
9296
@mock.patch('proxy.proxy.FlagParser.initialize')
9397
@mock.patch('proxy.proxy.EventManager')
94-
@mock.patch('proxy.proxy.AcceptorPool')
95-
@mock.patch('proxy.proxy.ThreadlessPool')
96-
@mock.patch('proxy.proxy.ListenerPool')
98+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
99+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
100+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
97101
def test_entry_point(
98102
self,
99103
mock_listener_pool: mock.Mock,
@@ -147,9 +151,9 @@ def test_entry_point(
147151
@mock.patch('time.sleep')
148152
@mock.patch('proxy.proxy.FlagParser.initialize')
149153
@mock.patch('proxy.proxy.EventManager')
150-
@mock.patch('proxy.proxy.AcceptorPool')
151-
@mock.patch('proxy.proxy.ThreadlessPool')
152-
@mock.patch('proxy.proxy.ListenerPool')
154+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
155+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
156+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
153157
def test_main_with_no_flags(
154158
self,
155159
mock_listener_pool: mock.Mock,
@@ -191,9 +195,9 @@ def test_main_with_no_flags(
191195
@mock.patch('time.sleep')
192196
@mock.patch('proxy.proxy.FlagParser.initialize')
193197
@mock.patch('proxy.proxy.EventManager')
194-
@mock.patch('proxy.proxy.AcceptorPool')
195-
@mock.patch('proxy.proxy.ThreadlessPool')
196-
@mock.patch('proxy.proxy.ListenerPool')
198+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
199+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
200+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
197201
def test_enable_events(
198202
self,
199203
mock_listener_pool: mock.Mock,
@@ -238,9 +242,9 @@ def test_enable_events(
238242
@mock.patch('proxy.common.plugins.Plugins.load')
239243
@mock.patch('proxy.common.flag.FlagParser.parse_args')
240244
@mock.patch('proxy.proxy.EventManager')
241-
@mock.patch('proxy.proxy.AcceptorPool')
242-
@mock.patch('proxy.proxy.ThreadlessPool')
243-
@mock.patch('proxy.proxy.ListenerPool')
245+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
246+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
247+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
244248
def test_enable_dashboard(
245249
self,
246250
mock_listener_pool: mock.Mock,
@@ -285,9 +289,9 @@ def test_enable_dashboard(
285289
@mock.patch('proxy.common.plugins.Plugins.load')
286290
@mock.patch('proxy.common.flag.FlagParser.parse_args')
287291
@mock.patch('proxy.proxy.EventManager')
288-
@mock.patch('proxy.proxy.AcceptorPool')
289-
@mock.patch('proxy.proxy.ThreadlessPool')
290-
@mock.patch('proxy.proxy.ListenerPool')
292+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
293+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
294+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
291295
def test_enable_devtools(
292296
self,
293297
mock_listener_pool: mock.Mock,
@@ -326,9 +330,9 @@ def test_enable_devtools(
326330
@mock.patch('proxy.common.plugins.Plugins.load')
327331
@mock.patch('proxy.common.flag.FlagParser.parse_args')
328332
@mock.patch('proxy.proxy.EventManager')
329-
@mock.patch('proxy.proxy.AcceptorPool')
330-
@mock.patch('proxy.proxy.ThreadlessPool')
331-
@mock.patch('proxy.proxy.ListenerPool')
333+
@mock.patch(DEFAULT_ACCEPTOR_POOL_KLASS)
334+
@mock.patch(DEFAULT_THREADLESS_POOL_KLASS)
335+
@mock.patch(DEFAULT_LISTENER_POOL_KLASS)
332336
@mock.patch('proxy.proxy.SshHttpProtocolHandler')
333337
@mock.patch('proxy.proxy.SshTunnelListener')
334338
def test_enable_ssh_tunnel(

0 commit comments

Comments
 (0)