Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
523 changes: 420 additions & 103 deletions lockfiles/st2.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion requirements-pants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ flex
# gitpython & gitdb are used for pack management
gitdb
gitpython
# st2common/tests/integration/test_util_green.py requires greenlet (as does eventlet)
# st2common/tests/integration/test_util_green.py requires greenlet (as does eventlet and gevent)
gevent
greenlet
gunicorn
jinja2
Expand Down
9 changes: 4 additions & 5 deletions st2auth/st2auth/cmd/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 The StackStorm Authors.
# Copyright 2020-2026 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,12 +17,11 @@

monkey_patch()

import eventlet
from st2common.util import concurrency
import os
import sys

from oslo_config import cfg
from eventlet import wsgi

from st2common import log as logging
from st2common.service_setup import setup as common_setup
Expand Down Expand Up @@ -81,7 +80,7 @@ def _run_server():
if use_ssl and not os.path.isfile(key_file_path):
raise ValueError('Private key file "%s" doesn\'t exist' % (key_file_path))

socket = eventlet.listen((host, port))
socket = concurrency.listen(host, port)

if use_ssl:
socket = eventlet.wrap_ssl(
Expand All @@ -96,7 +95,7 @@ def _run_server():
host,
port,
)

wsgi = concurrency.get_wsgi_module()
wsgi.server(socket, app.setup_app(), log=LOG, log_output=False)
return 0

Expand Down
33 changes: 12 additions & 21 deletions st2common/st2common/service_setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 The StackStorm Authors.
# Copyright 2020-2026 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -26,7 +26,7 @@
import logging as stdlib_logging

import six
import eventlet.debug

from oslo_config import cfg
from tooz.coordination import GroupAlreadyExist
from tooz.coordination import GroupNotCreated
Expand All @@ -38,6 +38,7 @@
from st2common.transport.bootstrap_utils import register_kombu_serializers
from st2common.bootstrap import runnersregistrar
from st2common.signal_handlers import register_common_signal_handlers
from st2common.util.concurrency import blocking_detection
from st2common.util.debugging import enable_debugging
from st2common.models.utils.profiling import enable_profiling
from st2common import triggers
Expand Down Expand Up @@ -182,8 +183,7 @@ def setup(
or encoding.lower() not in VALID_UTF8_ENCODINGS
):
LOG.warning(
NON_UTF8_LOCALE_WARNING_MSG
% (fs_encoding, default_encoding, used_locale.strip())
NON_UTF8_LOCALE_WARNING_MSG % (fs_encoding, default_encoding, used_locale.strip())
)

is_debug_enabled = cfg.CONF.debug or cfg.CONF.system.debug
Expand All @@ -197,9 +197,7 @@ def setup(
except KeyError as e:
tb_msg = traceback.format_exc()
if "log.setLevel" in tb_msg:
msg = (
"Invalid log level selected. Log level names need to be all uppercase."
)
msg = "Invalid log level selected. Log level names need to be all uppercase."
msg += "\n\n" + getattr(e, "message", six.text_type(e))
raise KeyError(msg)
else:
Expand All @@ -214,8 +212,7 @@ def setup(
# set to "INFO" and we already log messages with level AUDIT to a special dedicated log
# file.
ignore_audit_log_messages = (
handler.level >= stdlib_logging.INFO
and handler.level < stdlib_logging.AUDIT
handler.level >= stdlib_logging.INFO and handler.level < stdlib_logging.AUDIT
)
if not is_debug_enabled and ignore_audit_log_messages:
try:
Expand All @@ -224,10 +221,7 @@ def setup(
# In case handler doesn't have name assigned, repr would throw
handler_repr = "unknown"

LOG.debug(
'Excluding log messages with level "AUDIT" for handler "%s"'
% (handler_repr)
)
LOG.debug('Excluding log messages with level "AUDIT" for handler "%s"' % (handler_repr))
handler.addFilter(LogLevelFilter(log_levels=exclude_log_levels))

if not is_debug_enabled:
Expand Down Expand Up @@ -285,11 +279,10 @@ def setup(
# modules like jinja, stevedore, etc load files from disk on init which is slow and will be
# detected as blocking operation, but this is not really an issue inside the service startup /
# init phase.
if cfg.CONF.enable_eventlet_blocking_detection:
print("Eventlet long running / blocking operation detection logic enabled")
print(cfg.CONF.eventlet_blocking_detection_resolution)
eventlet.debug.hub_blocking_detection(
state=True, resolution=cfg.CONF.eventlet_blocking_detection_resolution
if cfg.CONF.enable_concurrency_blocking_detection:
blocking_detection(
cfg.CONF.enable_concurrency_blocking_detection,
cfg.CONF.concurrency_blocking_detection_resolution,
)


Expand Down Expand Up @@ -354,9 +347,7 @@ def deregister_service(service, start_heart=True):
coordinator = coordination.get_coordinator(start_heart=start_heart)

member_id = coordination.get_member_id()
LOG.debug(
'Leaving service registry group "%s" as member_id "%s"' % (group_id, member_id)
)
LOG.debug('Leaving service registry group "%s" as member_id "%s"' % (group_id, member_id))
try:
coordinator.leave_group(group_id).get()
except (GroupNotCreated, MemberNotJoined):
Expand Down
83 changes: 76 additions & 7 deletions st2common/st2common/util/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ def get_subprocess_module():
from gevent import subprocess # pylint: disable=import-error

return subprocess
else:
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def get_wsgi_module():
if CONCURRENCY_LIBRARY == "eventlet":
from eventlet import wsgi

return wsgi
elif CONCURRENCY_LIBRARY == "gevent":
from gevent import pywsgi

return pywsgi
else:
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def subprocess_popen(*args, **kwargs):
Expand All @@ -84,6 +99,8 @@ def subprocess_popen(*args, **kwargs):
from gevent import subprocess # pylint: disable=import-error

return subprocess.Popen(*args, **kwargs)
else:
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def spawn(func, *args, **kwargs):
Expand All @@ -92,7 +109,7 @@ def spawn(func, *args, **kwargs):
elif CONCURRENCY_LIBRARY == "gevent":
return gevent.spawn(func, *args, **kwargs)
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def wait(green_thread, *args, **kwargs):
Expand All @@ -101,7 +118,7 @@ def wait(green_thread, *args, **kwargs):
elif CONCURRENCY_LIBRARY == "gevent":
return green_thread.join(*args, **kwargs)
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def cancel(green_thread, *args, **kwargs):
Expand All @@ -110,7 +127,7 @@ def cancel(green_thread, *args, **kwargs):
elif CONCURRENCY_LIBRARY == "gevent":
return green_thread.kill(*args, **kwargs)
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def kill(green_thread, *args, **kwargs):
Expand All @@ -119,7 +136,16 @@ def kill(green_thread, *args, **kwargs):
elif CONCURRENCY_LIBRARY == "gevent":
return green_thread.kill(*args, **kwargs)
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def listen(host, port):
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.listen((host, port))
elif CONCURRENCY_LIBRARY == "gevent":
raise NotImplementedError
else:
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def sleep(*args, **kwargs):
Expand All @@ -128,14 +154,24 @@ def sleep(*args, **kwargs):
elif CONCURRENCY_LIBRARY == "gevent":
return gevent.sleep(*args, **kwargs)
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def get_greenlet_exit_exception_class():
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.support.greenlets.GreenletExit
elif CONCURRENCY_LIBRARY == "gevent":
return gevent.GreenletExit
else:
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def get_default_green_pool_size():
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS
elif CONCURRENCY_LIBRARY == "gevent":
# matches what DEFAULT_MAX_SIMULTANEOUS_REQUESTS is for eventlet
return 1024
else:
raise ValueError("Unsupported concurrency library")

Expand All @@ -146,7 +182,7 @@ def get_green_pool_class():
elif CONCURRENCY_LIBRARY == "gevent":
return gevent.pool.Pool
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def is_green_pool_free(pool):
Expand All @@ -158,7 +194,7 @@ def is_green_pool_free(pool):
elif CONCURRENCY_LIBRARY == "gevent":
return not pool.full()
else:
raise ValueError("Unsupported concurrency library")
raise ValueError(f"Unsupported concurrency library {CONCURRENCY_LIBRARY}")


def green_pool_wait_all(pool):
Expand All @@ -173,3 +209,36 @@ def green_pool_wait_all(pool):
return all(gl.ready() for gl in pool.greenlets)
else:
raise ValueError("Unsupported concurrency library")


def listen_server(host, port, backlog=50, **kwargs):
"""
Start listening on the host:port.
:backlog: the number of unaccepted connections that the system will allow before refusing new connections.
"""
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.listen((host, port), backlog=backlog, **kwargs)
elif CONCURRENCY_LIBRARY == "gevent":
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
return sock.listen(backlog)
else:
raise ValueError("Unsupported concurrency library")


def blocking_detection(enable=False, timeout=1.0):
if CONCURRENCY_LIBRARY == "eventlet":
print(
f"Eventlet long running / blocking operation detection logic enabled. Block timeout ({timeout})."
)
eventlet.debug.hub_blocking_detection(enable_detection=enable, resolution=timeout)
elif CONCURRENCY_LIBRARY == "gevent":
print(
f"gEvent long running / blocking operation detection logic enabled. Block timeout ({timeout})."
)
gevent.config.monitor_thread = enable
gevent.config.max_blocking_time = timeout
else:
raise ValueError("Unsupported concurrency library")
7 changes: 5 additions & 2 deletions st2common/st2common/util/monkey_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ def monkey_patch(patch_thread=None):
os=True, select=True, socket=True, thread=patch_thread, time=True
)
elif concurrency_library == "gevent":
# TODO: support gevent.patch_all if .concurrency.CONCURRENCY_LIBRARY = "gevent"
raise NotImplementedError
# Match what eventlet was enabling, fallback on gevent defaults
import gevent
gevent.monkey.patch_all(
os=True, select=True, thread=True, time=True, socket=True
)
else:
raise RuntimeError(f"Unsupported concurrency library {concurrency_library}")

Expand Down
22 changes: 12 additions & 10 deletions st2stream/st2stream/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sys

import eventlet

from oslo_config import cfg
from eventlet import wsgi

Expand All @@ -29,6 +30,7 @@
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.stream.listener import get_listener_if_set
from st2common.util import concurrency
from st2common.util.wsgi import shutdown_server_kill_pending_requests
from st2stream.signal_handlers import register_stream_signal_handlers
from st2stream import config
Expand All @@ -40,13 +42,13 @@
__all__ = ["main"]


eventlet.monkey_patch(
os=True,
select=True,
socket=True,
thread=False if "--use-debugger" in sys.argv else True,
time=True,
)
#eventlet.monkey_patch(
# os=True,
# select=True,
# socket=True,
# thread=False if "--use-debugger" in sys.argv else True,
# time=True,
#)

LOG = logging.getLogger(__name__)
STREAM = "stream"
Expand Down Expand Up @@ -83,9 +85,9 @@ def _run_server():
"(PID=%s) ST2 Stream API is serving on http://%s:%s.", os.getpid(), host, port
)

max_pool_size = eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS
worker_pool = eventlet.GreenPool(max_pool_size)
sock = eventlet.listen((host, port))
max_pool_size = concurrency.get_default_green_pool_size()
worker_pool = concurrency.get_green_pool_class()(max_pool_size)
sock = concurrency.listen_server(host, port)

def queue_shutdown(signal_number, stack_frame):
deregister_service(STREAM)
Expand Down