Skip to content

Commit

Permalink
Implement #425
Browse files Browse the repository at this point in the history
In spirit, at least
  • Loading branch information
stijn-uva committed Sep 19, 2024
1 parent 591c84f commit bff0a4d
Show file tree
Hide file tree
Showing 154 changed files with 704 additions and 582 deletions.
6 changes: 5 additions & 1 deletion backend/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from common.lib.queue import JobQueue
from common.lib.database import Database
from common.lib.module_loader import ModuleCollector
from backend.lib.manager import WorkerManager
from common.lib.logger import Logger

Expand Down Expand Up @@ -66,9 +67,12 @@ def run(as_daemon=True, log_level="INFO"):
config.with_db(db)
config.ensure_database()

# load 4CAT modules and cache the results
modules = ModuleCollector(config=config, write_cache=True)

# make it happen
# this is blocking until the back-end is shut down
WorkerManager(logger=log, database=db, queue=queue, as_daemon=as_daemon)
WorkerManager(logger=log, database=db, queue=queue, modules=modules, as_daemon=as_daemon)

# clean up pidfile, if running as daemon
if as_daemon:
Expand Down
6 changes: 3 additions & 3 deletions backend/lib/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import signal
import time

from common.lib.module_loader import ModuleCollector
from common.lib.exceptions import JobClaimedException


Expand All @@ -22,19 +21,20 @@ class WorkerManager:
pool = []
looping = True

def __init__(self, queue, database, logger, as_daemon=True):
def __init__(self, queue, database, logger, modules, as_daemon=True):
"""
Initialize manager
:param queue: Job queue
:param database: Database handler
:param logger: Logger object
:param modules: Modules cache via ModuleLoader()
:param bool as_daemon: Whether the manager is being run as a daemon
"""
self.queue = queue
self.db = database
self.log = logger
self.modules = ModuleCollector(write_config=True)
self.modules = modules

if as_daemon:
signal.signal(signal.SIGTERM, self.abort)
Expand Down
36 changes: 17 additions & 19 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from common.lib.helpers import get_software_commit, remove_nuls, send_email
from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
DataSetException, MapItemException)
from common.config_manager import config, ConfigWrapper
from common.config_manager import ConfigWrapper
from common.lib.user import User


Expand All @@ -37,14 +37,14 @@ class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
useful is another question).
To determine whether a processor can process a given dataset, you can
define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class
define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
.. code-block:: python
@classmethod
def is_compatible_with(cls, module=None, user=None):
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
Expand Down Expand Up @@ -109,11 +109,10 @@ def work(self):
self.job.finish()
return

# set up config reader using the worker's DB connection and the dataset
# creator. This ensures that if a value has been overriden for the owner,
# the overridden value is used instead.
config.with_db(self.db)
self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner))
# set up config reader wrapping the worker's config manager, which is
# in turn the one passed to it by the WorkerManager, which is the one
# originally loaded in bootstrap
self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))

if self.dataset.data.get("key_parent", None):
# search workers never have parents (for now), so we don't need to
Expand Down Expand Up @@ -170,7 +169,7 @@ def work(self):
# get parameters
# if possible, fill defaults where parameters are not provided
given_parameters = self.dataset.parameters.copy()
all_parameters = self.get_options(self.dataset)
all_parameters = self.get_options(self.dataset, config=self.config)
self.parameters = {
param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
for param in [*all_parameters.keys(), *given_parameters.keys()]
Expand All @@ -179,7 +178,7 @@ def work(self):
# now the parameters have been loaded into memory, clear any sensitive
# ones. This has a side-effect that a processor may not run again
# without starting from scratch, but this is the price of progress
options = self.get_options(self.dataset.get_parent())
options = self.get_options(self.dataset.get_parent(), config=self.config)
for option, option_settings in options.items():
if option_settings.get("sensitive"):
self.dataset.delete_parameter(option)
Expand Down Expand Up @@ -241,7 +240,7 @@ def after_process(self):
next_parameters = next.get("parameters", {})
next_type = next.get("type", "")
try:
available_processors = self.dataset.get_available_processors(user=self.dataset.creator)
available_processors = self.dataset.get_available_processors(config=self.config)
except ValueError:
self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
break
Expand Down Expand Up @@ -329,7 +328,7 @@ def after_process(self):

self.job.finish()

if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
owner = self.dataset.get_parameters().get("email-complete", False)
# Check that username is email address
if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
Expand All @@ -340,8 +339,8 @@ def after_process(self):
import html2text

self.log.debug("Sending email to %s" % owner)
dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key
sender = config.get('mail.noreply')
dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
sender = self.config.get('mail.noreply')
message = MIMEMultipart("alternative")
message["From"] = sender
message["To"] = owner
Expand Down Expand Up @@ -778,7 +777,7 @@ def is_filter(cls):
return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()

@classmethod
def get_options(cls, parent_dataset=None, user=None):
def get_options(cls, parent_dataset=None, config=None):
"""
Get processor options
Expand All @@ -787,12 +786,11 @@ def get_options(cls, parent_dataset=None, user=None):
fine-grained options, e.g. in cases where the availability of options
is partially determined by the parent dataset's parameters.
:param config:
:param DataSet parent_dataset: An object representing the dataset that
the processor would be run on
:param User user: Flask user the options will be displayed for, in
case they are requested for display in the 4CAT web interface. This can
be used to show some options only to privileges users.
"""
return cls.options if hasattr(cls, "options") else {}
@classmethod
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def process(self):
query_parameters = self.dataset.get_parameters()
results_file = self.dataset.get_results_path()

self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options().get(k, {}).get("sensitive", False)}))
self.log.info("Querying: %s" % str({k: v for k, v in query_parameters.items() if not self.get_options(
config=self.config).get(k, {}).get("sensitive", False)}))

# Execute the relevant query (string-based, random, countryflag-based)
try:
Expand Down
2 changes: 1 addition & 1 deletion backend/lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, logger, job, queue=None, manager=None, modules=None):
self.manager = manager
self.job = job
self.init_time = int(time.time())
self.config = ConfigDummy()
self.config = modules.config

# ModuleCollector cannot be easily imported into a worker because it itself
# imports all workers, so you get a recursive import that Python (rightly) blocks
Expand Down
7 changes: 5 additions & 2 deletions backend/workers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class InternalAPI(BasicWorker):

ensure_job = {"remote_id": "localhost"}

host = config.get('API_HOST')
port = config.get('API_PORT')
host = None
port = None

def work(self):
"""
Expand All @@ -27,6 +27,9 @@ def work(self):
:return:
"""
self.host = self.config.get('API_HOST')
self.port = self.config.get('API_PORT')

if self.port == 0:
# if configured not to listen, just loop until the backend shuts
# down we can't return here immediately, since this is a worker,
Expand Down
7 changes: 3 additions & 4 deletions backend/workers/check_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import requests
import json

from common.config_manager import config
from common.lib.helpers import add_notification, get_github_version
from backend.lib.worker import BasicWorker
from pathlib import Path
Expand All @@ -22,11 +21,11 @@ class UpdateChecker(BasicWorker):
max_workers = 1

# check once every three hours
ensure_job = {"remote_id": config.get("4cat.github_url"), "interval": 10800}
ensure_job = {"remote_id": self.config.get("4cat.github_url"), "interval": 10800}

def work(self):
versionfile = Path(config.get("PATH_ROOT"), "config/.current-version")
repo_url = config.get("4cat.github_url")
versionfile = Path(self.config.get("PATH_ROOT"), "config/.current-version")
repo_url = self.config.get("4cat.github_url")

if not versionfile.exists() or not repo_url:
# need something to compare against...
Expand Down
3 changes: 1 addition & 2 deletions backend/workers/cleanup_tempfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from pathlib import Path

from common.config_manager import config
from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.exceptions import WorkerInterruptedException, DataSetException
Expand Down Expand Up @@ -34,7 +33,7 @@ def work(self):
:return:
"""

result_files = Path(config.get('PATH_DATA')).glob("*")
result_files = Path(self.config.get('PATH_DATA')).glob("*")
for file in result_files:
if file.stem.startswith("."):
# skip hidden files
Expand Down
11 changes: 5 additions & 6 deletions backend/workers/datasource_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from datetime import datetime, time, timezone

from backend.lib.worker import BasicWorker
from common.config_manager import config


class DatasourceMetrics(BasicWorker):
Expand Down Expand Up @@ -52,9 +51,9 @@ def general_stats(self):
this worker instead of on demand.
"""
metrics = {
"size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")),
"size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")),
"size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"]
"size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")),
"size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")),
"size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"]
}

for metric, value in metrics.items():
Expand Down Expand Up @@ -95,7 +94,7 @@ def data_stats(self):
""")

added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
enabled_datasources = config.get("datasources.enabled", {})
enabled_datasources = self.config.get("datasources.enabled", {})

for datasource_id in self.modules.datasources:
if datasource_id not in enabled_datasources:
Expand All @@ -121,7 +120,7 @@ def data_stats(self):
elif datasource_id == "8chan":
settings_id = "eightchan"

boards = [b for b in config.get(settings_id + "-search.boards", [])]
boards = [b for b in self.config.get(settings_id + "-search.boards", [])]

# If a datasource is static (so not updated) and it
# is already present in the metrics table, we don't
Expand Down
6 changes: 4 additions & 2 deletions backend/workers/expire_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"""
import datetime
import time
import json
import re

from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.exceptions import DataSetNotFoundException, WorkerInterruptedException

from common.lib.user import User
from common.config_manager import ConfigWrapper


class ThingExpirer(BasicWorker):
Expand Down Expand Up @@ -58,9 +58,11 @@ def expire_datasets(self):
if self.interrupted:
raise WorkerInterruptedException("Interrupted while expiring datasets")

# the dataset creator's configuration context determines expiration
wrapper = ConfigWrapper(self.config, user=dataset["creator"])
try:
dataset = DataSet(key=dataset["key"], db=self.db)
if dataset.is_expired():
if dataset.is_expired(config=wrapper):
self.log.info(f"Deleting dataset {dataset.key} (expired)")
dataset.delete()

Expand Down
19 changes: 9 additions & 10 deletions backend/workers/restart_4cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from backend.lib.worker import BasicWorker
from common.lib.exceptions import WorkerInterruptedException
from common.config_manager import config


class FourcatRestarterAndUpgrader(BasicWorker):
Expand Down Expand Up @@ -50,11 +49,11 @@ def work(self):

# prevent multiple restarts running at the same time which could blow
# up really fast
lock_file = Path(config.get("PATH_ROOT")).joinpath("config/restart.lock")
lock_file = Path(self.config.get("PATH_ROOT")).joinpath("config/restart.lock")

# this file has the log of the restart worker itself and is checked by
# the frontend to see how far we are
log_file_restart = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_LOGS")).joinpath("restart.log")
log_file_restart = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_LOGS")).joinpath("restart.log")
log_stream_restart = log_file_restart.open("a")

if not is_resuming:
Expand All @@ -74,7 +73,7 @@ def work(self):

if self.job.data["remote_id"].startswith("upgrade"):
command = sys.executable + " helper-scripts/migrate.py --repository %s --yes --restart --output %s" % \
(shlex.quote(config.get("4cat.github_url")), shlex.quote(str(log_file_restart)))
(shlex.quote(self.config.get("4cat.github_url")), shlex.quote(str(log_file_restart)))
if self.job.details and self.job.details.get("branch"):
# migrate to code in specific branch
command += f" --branch {shlex.quote(self.job.details['branch'])}"
Expand All @@ -100,7 +99,7 @@ def work(self):
# restarts and we re-attempt to make a daemon, it will fail
# when trying to close the stdin file descriptor of the
# subprocess (man, that was a fun bug to hunt down)
process = subprocess.Popen(shlex.split(command), cwd=str(config.get("PATH_ROOT")),
process = subprocess.Popen(shlex.split(command), cwd=str(self.config.get("PATH_ROOT")),
stdout=log_stream_restart, stderr=log_stream_restart,
stdin=subprocess.DEVNULL)

Expand Down Expand Up @@ -143,20 +142,20 @@ def work(self):
# front-end restart or upgrade too
self.log.info("Restart worker resumed after restarting 4CAT, restart successful.")
log_stream_restart.write("4CAT restarted.\n")
with Path(config.get("PATH_ROOT")).joinpath("config/.current-version").open() as infile:
with Path(self.config.get("PATH_ROOT")).joinpath("config/.current-version").open() as infile:
log_stream_restart.write(f"4CAT is now running version {infile.readline().strip()}.\n")

# we're gonna use some specific Flask routes to trigger this, i.e.
# we're interacting with the front-end through HTTP
api_host = "https://" if config.get("flask.https") else "http://"
if config.get("USING_DOCKER"):
api_host = "https://" if self.config.get("flask.https") else "http://"
if self.config.get("USING_DOCKER"):
import os
docker_exposed_port = os.environ['PUBLIC_PORT']
api_host += f"host.docker.internal{':' + docker_exposed_port if docker_exposed_port != '80' else ''}"
else:
api_host += config.get("flask.server_name")
api_host += self.config.get("flask.server_name")

if self.job.data["remote_id"].startswith("upgrade") and config.get("USING_DOCKER"):
if self.job.data["remote_id"].startswith("upgrade") and self.config.get("USING_DOCKER"):
# when using Docker, the front-end needs to update separately
log_stream_restart.write("Telling front-end Docker container to upgrade...\n")
log_stream_restart.close() # close, because front-end will be writing to it
Expand Down
7 changes: 6 additions & 1 deletion common/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,12 @@ def __init__(self, config, user=None, tags=None, request=None):
serve 4CAT with a different configuration based on the proxy server
used.
"""
self.config = config
if type(config) is ConfigWrapper:
# let's not do nested wrappers
self.config = config.config
else:
self.config = config

self.user = user
self.tags = tags
self.request = request
Expand Down
Loading

0 comments on commit bff0a4d

Please sign in to comment.