Skip to content
This repository was archived by the owner on Nov 30, 2022. It is now read-only.

B/add continuous background job handler #757

Merged
merged 14 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions hyrisecockpit/database_manager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from hyrisecockpit.settings import (
DB_MANAGER_LISTENING,
DB_MANAGER_PORT,
DEFAULT_TABLES,
STORAGE_HOST,
STORAGE_PASSWORD,
STORAGE_PORT,
Expand All @@ -22,7 +21,6 @@ def main() -> None:
DB_MANAGER_PORT,
WORKLOAD_SUB_HOST,
WORKLOAD_PUBSUB_PORT,
DEFAULT_TABLES,
STORAGE_HOST,
STORAGE_PASSWORD,
STORAGE_PORT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""The BackgroundJobManager is managing the background jobs for the apscheduler."""
"""This module handles the continuous jobs."""

from multiprocessing import Value

from apscheduler.schedulers.background import BackgroundScheduler

from .asynchronous_job_handler import AsynchronousJobHandler
from .cursor import ConnectionFactory, StorageConnectionFactory
from .job.ping_hyrise import ping_hyrise
from .job.update_chunks_data import update_chunks_data
Expand All @@ -22,47 +21,59 @@
from .worker_pool import WorkerPool


class BackgroundJobManager(object):
"""Manage background scheduling jobs."""
class ContinuousJobHandler:
"""Continuous Job Handler.

This class manage all jobs that are executed continuously in the background.
The jobs are executed via a background scheduler
(https://apscheduler.readthedocs.io/en/stable/).
"""

def __init__(
self,
database_id: str,
database_blocked: Value,
connection_factory: ConnectionFactory,
hyrise_active: Value,
worker_pool: WorkerPool,
storage_connection_factory: StorageConnectionFactory,
workload_drivers,
database_blocked: Value,
):
"""Initialize BackgroundJobManager object."""
self._database_id: str = database_id
"""Initialize continuous Job Handler.

Args:
connection_factory: An object to create a connection to the Hyrise
database. All connection relevant information (port, host) is
saved in this object.
hyrise_active: Flag stored in a shared memory map. This flag
stores if the Hyrise instance is responsive or not.
worker_pool: An object that manages the queue and task/queue workers. Its
api is used to get the queue length.
storage_connection_factory: An object to create a connection to the Influx
database. All connection relevant information (port, host) is
saved in this object.
database_blocked: Flag stored in a shared memory map. This flag
stores if the Hyrise instance is blocked or not.
"""
self._connection_factory = connection_factory
self._hyrise_active = hyrise_active
self._worker_pool = worker_pool
self._storage_connection_factory = storage_connection_factory
self._database_blocked: Value = database_blocked
self._connection_factory: ConnectionFactory = connection_factory
self._storage_connection_factory: StorageConnectionFactory = (
storage_connection_factory
)
self._worker_pool: WorkerPool = worker_pool
self._workload_drivers = workload_drivers
self._scheduler: BackgroundScheduler = BackgroundScheduler()
self._hyrise_active: Value = hyrise_active
self._previous_system_data = {
"previous_system_usage": None,
"previous_process_usage": None,
}
self._previous_chunk_data = {
"value": None,
}
self._asynchronous_job_handler = AsynchronousJobHandler(
self._database_blocked,
self._connection_factory,
self._workload_drivers,
)

self._scheduler: BackgroundScheduler = BackgroundScheduler()
self._init_jobs()

def _init_jobs(self) -> None:
"""Initialize basic background jobs."""
"""Initialize basic background jobs.

This function registers all continuous jobs in the background
scheduler.
"""
self._ping_hyrise_job = self._scheduler.add_job(
func=ping_hyrise,
trigger="interval",
Expand Down Expand Up @@ -153,7 +164,11 @@ def start(self) -> None:
self._scheduler.start()

def close(self) -> None:
"""Close background scheduler."""
"""Close background scheduler.

Here we remove all jobs from the background scheduler and
shutting it down.
"""
self._update_workload_statement_information_job.remove()
self._update_system_data_job.remove()
self._update_chunks_data_job.remove()
Expand All @@ -163,19 +178,3 @@ def close(self) -> None:
self._update_workload_operator_information_job.remove()
self._ping_hyrise_job.remove()
self._scheduler.shutdown()

def load_tables(self, workload_type: str, scalefactor: float) -> bool:
"""Load tables."""
return self._asynchronous_job_handler.load_tables(workload_type, scalefactor)

def delete_tables(self, workload_type: str, scalefactor: float) -> bool:
"""Delete tables."""
return self._asynchronous_job_handler.delete_tables(workload_type, scalefactor)

def activate_plugin(self, plugin: str) -> bool:
"""Activate plugin."""
return self._asynchronous_job_handler.activate_plugin(plugin)

def deactivate_plugin(self, plugin: str) -> bool:
"""Dectivate plugin."""
return self._asynchronous_job_handler.deactivate_plugin(plugin)
114 changes: 93 additions & 21 deletions hyrisecockpit/database_manager/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from hyrisecockpit.drivers.connector import Connector

from .background_scheduler import BackgroundJobManager
from .asynchronous_job_handler import AsynchronousJobHandler
from .continuous_job_handler import ContinuousJobHandler
from .cursor import ConnectionFactory, StorageConnectionFactory
from .interfaces import SqlResultInterface
from .worker_pool import WorkerPool
Expand All @@ -19,7 +20,14 @@


class Database(object):
"""Represents database."""
"""Represents database.

The database class is a representation of a registered Hyrise instance.
All the communication with the Hyrise is managed by this class and its API.
A worker pool object that manages the queue and task/queue workers to put the
Hyrise under pressure is part of this class. This class also initializes the
influx database for the Hyrise instance.
"""

def __init__(
self,
Expand All @@ -31,16 +39,40 @@ def __init__(
dbname: str,
number_workers: int,
workload_publisher_url: str,
default_tables: str,
storage_host: str,
storage_password: str,
storage_port: str,
storage_user: str,
) -> None:
"""Initialize database object."""
"""Initialize database object.

Args:
id: A name representation of the Hyrise instance.
user: The user that connects to the Hyrise instance.
password: The password needed to connects to the Hyrise instance.
host: The address of the host machine where the Hyrise instance is running.
port: The port on which the Hyrise instance is running.
dbname: The database name of the Hyrise instance.
number_workers: Number of task workers to put the Hyrise instance under
pressure. Every worker is running in its own process.
workload_publisher_url: URL which is publishing the workload to the workers.
This URL is used by the queue worker to connect to the workload
generator via zeromq.
storage_host: Address in which the influx database is running.
storage_password: Password to connect to the influx database.
storage_port: Port of the influx database.
storage_user: User of the influx database.

Note:
The attributes user, password, host, port and dbname are the same attributes
that you would use to create a connection to the Hyrise instance with psql.

Attributes:
_connection_factory: This factory builds a Hyrise connection.
_storage_connection_factory: This factory builds a influx connection.
"""
self._id = id
self.number_workers: int = number_workers
self._default_tables: str = default_tables

self.connection_information: Dict[str, str] = {
"host": host,
Expand Down Expand Up @@ -75,20 +107,31 @@ def __init__(
self._database_blocked,
self._workload_drivers,
)
self._background_scheduler: BackgroundJobManager = BackgroundJobManager(
self._id,
self._database_blocked,
self._continuous_job_handler = ContinuousJobHandler(
self._connection_factory,
self._hyrise_active,
self._worker_pool,
self._storage_connection_factory,
self._database_blocked,
)
self._asynchronous_job_handler = AsynchronousJobHandler(
self._database_blocked,
self._connection_factory,
self._workload_drivers,
)
self._initialize_influx()
self._background_scheduler.start()
self._continuous_job_handler.start()

def _initialize_influx(self) -> None:
"""Initialize Influx database."""
"""Initialize Influx database.

We drop the database inside influx that has the same name like the
database id (Hyrise). We do that to remove all the data from previous
runs. Than we create a new database inside influx with the database id
(Hyrise). After that we create continuous query that the influx is running
every x seconds. For example it will so automatically calculate the throughput
per second.
"""
with self._storage_connection_factory.create_cursor() as cursor:
cursor.drop_database()
cursor.create_database()
Expand Down Expand Up @@ -168,11 +211,23 @@ def _initialize_influx(self) -> None:
)

def get_queue_length(self) -> int:
"""Return queue length."""
"""Return queue length.

We return the number of task that still need to be send to the
Hyrise instance.
"""
return self._worker_pool.get_queue_length()

def load_data(self, workload: Dict) -> bool:
"""Load pre-generated tables."""
"""Load pre-generated tables.

First we check if the workload and scale factor is valid.
For that workload need to have an equivalent driver and the
driver needs to support the scale factor. Moreover the worker pool
needs to be closed. If one of this requirements is not met the function
will return false. Otherwise the tables will be loaded via the
asynchronous job handler.
"""
workload_type = workload["workload_type"]
scale_factor = workload["scale_factor"]
if workload_type not in self._workload_drivers:
Expand All @@ -184,12 +239,20 @@ def load_data(self, workload: Dict) -> bool:
elif self._worker_pool.get_status() != "closed":
return False
else:
return self._background_scheduler.load_tables(
return self._asynchronous_job_handler.load_tables(
workload_type, float(scale_factor)
)

def delete_data(self, workload: Dict) -> bool:
"""Delete tables."""
"""Delete tables.

First we check if the workload and scale factor is valid.
For that workload need to have an equivalent driver and the
driver needs to support the scale factor. Moreover the worker pool
needs to be closed. If one of this requirements is not met the function
will return false. Otherwise the tables will be deleted via the
asynchronous job handler.
"""
workload_type = workload["workload_type"]
scale_factor = workload["scale_factor"]
if workload_type not in self._workload_drivers:
Expand All @@ -201,7 +264,7 @@ def delete_data(self, workload: Dict) -> bool:
elif self._worker_pool.get_status() != "closed":
return False
else:
return self._background_scheduler.delete_tables(
return self._asynchronous_job_handler.delete_tables(
workload_type, float(scale_factor)
)

Expand All @@ -210,22 +273,31 @@ def activate_plugin(self, plugin: str) -> bool:
active_plugins = self._get_plugins()
if active_plugins is None or plugin in active_plugins:
return False
return self._background_scheduler.activate_plugin(plugin)
return self._asynchronous_job_handler.activate_plugin(plugin)

def deactivate_plugin(self, plugin: str) -> bool:
"""Deactivate plugin."""
return self._background_scheduler.deactivate_plugin(plugin)
return self._asynchronous_job_handler.deactivate_plugin(plugin)

def get_database_blocked(self) -> bool:
"""Return tables loading flag."""
"""Return database blocked flag.

The database is blocked if we load/delete tables.
"""
return bool(self._database_blocked.value)

def get_worker_pool_status(self) -> str:
"""Return worker pool status."""
"""Return worker pool status.

A worker poll can have the status running and closed.
"""
return self._worker_pool.get_status()

def get_hyrise_active(self) -> bool:
"""Return status of hyrise."""
"""Return status of Hyrise.

This flag defines if the Hyrise instance is responsive or not.
"""
return bool(self._hyrise_active.value)

def get_loaded_tables_in_database(self) -> List[Dict[str, str]]:
Expand Down Expand Up @@ -380,4 +452,4 @@ def execute_sql_query(self, query) -> Optional[SqlResultInterface]:
def close(self) -> None:
"""Close the database."""
self._worker_pool.terminate()
self._background_scheduler.close()
self._continuous_job_handler.close()
3 changes: 0 additions & 3 deletions hyrisecockpit/database_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __init__(
db_manager_port: str,
workload_sub_host: str,
workload_pubsub_port: str,
default_tables: str,
storage_host: str,
storage_password: str,
storage_port: str,
Expand All @@ -41,7 +40,6 @@ def __init__(
"""Initialize a DatabaseManager."""
self._workload_sub_host = workload_sub_host
self._workload_pubsub_port = workload_pubsub_port
self._default_tables = default_tables
self._storage_host = storage_host
self._storage_password = storage_password
self._storage_port = storage_port
Expand Down Expand Up @@ -118,7 +116,6 @@ def _call_add_database(self, body: Body) -> Response:
self._workload_sub_host,
self._workload_pubsub_port,
),
self._default_tables,
self._storage_host,
self._storage_password,
self._storage_port,
Expand Down
Loading