Skip to content
This repository was archived by the owner on Nov 30, 2022. It is now read-only.

Commit 5a5b213

Browse files
Add asynchrone job handler (#752)
* Add first draft asynchrone_job_handler * Adjust background job manager to use asynchronous job handler * Fix test * Use google sytle docstrings * Use threads instead of processes * fix typos
1 parent 31d5103 commit 5a5b213

6 files changed

+503
-195
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""Asynchronous job handler."""
2+
3+
from multiprocessing import Value
4+
from threading import Thread
5+
from typing import Dict
6+
7+
from hyrisecockpit.database_manager.cursor import ConnectionFactory
8+
9+
from .job.activate_plugin import activate_plugin as activate_plugin_job
10+
from .job.deactivate_plugin import deactivate_plugin as deactivate_plugin_job
11+
from .job.delete_tables import delete_tables as delete_tables_job
12+
from .job.load_tables import load_tables as load_tables_job
13+
14+
15+
class AsynchronousJobHandler:
16+
"""Handles asynchronous jobs.
17+
18+
All jobs in this class are executed with python threads in the background.
19+
If a method from this class is called it will only response if the job
20+
was started successfully or not. All jobs in this class do not need to return a
21+
result.
22+
"""
23+
24+
def __init__(
25+
self,
26+
database_blocked: Value,
27+
connection_factory: ConnectionFactory,
28+
workload_drivers: Dict,
29+
):
30+
"""Initialize asynchronous job handler object.
31+
32+
Args:
33+
database_blocked: Flag stored in a shared memory map. This flag
34+
stores if the Hyrise instance is blocked or not.
35+
connection_factory: An object to create a connection to the Hyrise
36+
database. All connection relevant information (port, host) is
37+
saved in this object.
38+
workload_drivers: A dictionary containing all workload drivers (TPCC,...)
39+
"""
40+
self._database_blocked: Value = database_blocked
41+
self._connection_factory: ConnectionFactory = connection_factory
42+
self._workload_drivers: Dict = workload_drivers
43+
44+
def load_tables(self, workload_type: str, scalefactor: float) -> bool:
45+
"""Start load tabled job.
46+
47+
This function will check if the database is blocked, if not it will
48+
create a thread that is executing the job that will load the tables.
49+
50+
Returns:
51+
bool: True if job was successful started, False if database was
52+
blocked and job couldn't be started.
53+
"""
54+
if not self._database_blocked.value:
55+
self._database_blocked.value = True
56+
job_thread = Thread(
57+
target=load_tables_job,
58+
args=(
59+
self._database_blocked,
60+
workload_type,
61+
scalefactor,
62+
self._connection_factory,
63+
self._workload_drivers,
64+
),
65+
)
66+
job_thread.start()
67+
return True
68+
else:
69+
return False
70+
71+
def delete_tables(self, workload_type: str, scalefactor: float) -> bool:
72+
"""Start delete tabled job.
73+
74+
This function will check if the database is blocked. If not it will
75+
create a thread that is executing the job that will delete the tables.
76+
77+
Returns:
78+
bool: True if job was successful started, False if database was
79+
blocked and job couldn't be started.
80+
"""
81+
if not self._database_blocked.value:
82+
self._database_blocked.value = True
83+
job_thread = Thread(
84+
target=delete_tables_job,
85+
args=(
86+
self._database_blocked,
87+
workload_type,
88+
scalefactor,
89+
self._connection_factory,
90+
self._workload_drivers,
91+
),
92+
)
93+
job_thread.start()
94+
return True
95+
else:
96+
return False
97+
98+
def activate_plugin(self, plugin: str) -> bool:
99+
"""Start activate plug-in job.
100+
101+
This function will check if the database is blocked. If not it will
102+
create a thread that is executing the job that will activate the plug-in.
103+
104+
Returns:
105+
bool: True if job was successful started, False if database was
106+
blocked and job couldn't be started.
107+
"""
108+
if not self._database_blocked.value:
109+
job_thread = Thread(
110+
target=activate_plugin_job, args=(self._connection_factory, plugin,)
111+
)
112+
job_thread.start()
113+
return True
114+
else:
115+
return False
116+
117+
def deactivate_plugin(self, plugin: str) -> bool:
118+
"""Start deactivate plug-in job.
119+
120+
This function will check if the database is blocked. If not it will
121+
create a thread that is executing the job that will deactivate the plug-in.
122+
123+
Returns:
124+
bool: True if job was successful started, False if database was
125+
blocked and job couldn't be started.
126+
"""
127+
if not self._database_blocked.value:
128+
job_thread = Thread(
129+
target=deactivate_plugin_job, args=(self._connection_factory, plugin,)
130+
)
131+
job_thread.start()
132+
return True
133+
else:
134+
return False

hyrisecockpit/database_manager/background_scheduler.py

+8-48
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@
44

55
from apscheduler.schedulers.background import BackgroundScheduler
66

7+
from .asynchronous_job_handler import AsynchronousJobHandler
78
from .cursor import ConnectionFactory, StorageConnectionFactory
8-
from .job.activate_plugin import activate_plugin as activate_plugin_job
9-
from .job.deactivate_plugin import deactivate_plugin as deactivate_plugin_job
10-
from .job.delete_tables import delete_tables as delete_tables_job
11-
from .job.load_tables import load_tables as load_tables_job
129
from .job.ping_hyrise import ping_hyrise
1310
from .job.update_chunks_data import update_chunks_data
1411
from .job.update_plugin_log import update_plugin_log
@@ -54,6 +51,9 @@ def __init__(
5451
self._previous_chunk_data = {
5552
"value": None,
5653
}
54+
self._asynchronous_job_handler = AsynchronousJobHandler(
55+
self._database_blocked, self._connection_factory, self._workload_drivers,
56+
)
5757

5858
self._init_jobs()
5959

@@ -162,56 +162,16 @@ def close(self) -> None:
162162

163163
def load_tables(self, workload_type: str, scalefactor: float) -> bool:
164164
"""Load tables."""
165-
if not self._database_blocked.value:
166-
self._database_blocked.value = True
167-
self._scheduler.add_job(
168-
func=load_tables_job,
169-
args=(
170-
self._database_blocked,
171-
workload_type,
172-
scalefactor,
173-
self._connection_factory,
174-
self._workload_drivers,
175-
),
176-
)
177-
return True
178-
else:
179-
return False
165+
return self._asynchronous_job_handler.load_tables(workload_type, scalefactor)
180166

181167
def delete_tables(self, workload_type: str, scalefactor: float) -> bool:
182168
"""Delete tables."""
183-
if not self._database_blocked.value:
184-
self._database_blocked.value = True
185-
self._scheduler.add_job(
186-
func=delete_tables_job,
187-
args=(
188-
self._database_blocked,
189-
workload_type,
190-
scalefactor,
191-
self._connection_factory,
192-
self._workload_drivers,
193-
),
194-
)
195-
return True
196-
else:
197-
return False
169+
return self._asynchronous_job_handler.delete_tables(workload_type, scalefactor)
198170

199171
def activate_plugin(self, plugin: str) -> bool:
200172
"""Activate plugin."""
201-
if not self._database_blocked.value:
202-
self._scheduler.add_job(
203-
func=activate_plugin_job, args=(self._connection_factory, plugin,)
204-
)
205-
return True
206-
else:
207-
return False
173+
return self._asynchronous_job_handler.activate_plugin(plugin)
208174

209175
def deactivate_plugin(self, plugin: str) -> bool:
210176
"""Dectivate plugin."""
211-
if not self._database_blocked.value:
212-
self._scheduler.add_job(
213-
func=deactivate_plugin_job, args=(self._connection_factory, plugin,)
214-
)
215-
return True
216-
else:
217-
return False
177+
return self._asynchronous_job_handler.deactivate_plugin(plugin)

hyrisecockpit/database_manager/job/execute_queries_parallel.py

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""This job execute queries parallel in separate processes."""
2-
from multiprocessing import Process
2+
from threading import Thread
33
from typing import Any, List, Optional, Tuple
44

55
from psycopg2 import DatabaseError, InterfaceError, ProgrammingError
@@ -36,12 +36,11 @@ def _execute_table_query(
3636

3737
def execute_queries_parallel(queries, connection_factory: ConnectionFactory) -> None:
3838
"""Start processes for query execution."""
39-
processes: List[Process] = [
40-
Process(target=_execute_table_query, args=(query, connection_factory),)
39+
threads: List[Thread] = [
40+
Thread(target=_execute_table_query, args=(query, connection_factory),)
4141
for query in queries
4242
]
43-
for process in processes:
44-
process.start()
45-
for process in processes:
46-
process.join()
47-
process.terminate()
43+
for thread in threads:
44+
thread.start()
45+
for thread in threads:
46+
thread.join()

tests/database_manager/job/test_execute_queries_parallel.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,16 @@ def raise_exception(*args):
105105
("keep", "hyriseDown", "keep",),
106106
)
107107

108-
@patch("hyrisecockpit.database_manager.job.execute_queries_parallel.Process")
108+
@patch("hyrisecockpit.database_manager.job.execute_queries_parallel.Thread")
109109
@patch(
110110
"hyrisecockpit.database_manager.job.execute_queries_parallel._execute_table_query"
111111
)
112112
def test_execute_queries_parallel(
113-
self, mock_execute_table_query: MagicMock, mock_process_constructor: MagicMock
113+
self, mock_execute_table_query: MagicMock, mock_thread_constructor: MagicMock
114114
) -> None:
115115
"""Test start processes for table loading queries in parallel."""
116116
mock_process: MagicMock = MagicMock()
117-
mock_process_constructor.return_value = mock_process
117+
mock_thread_constructor.return_value = mock_process
118118
mock_cursor = MagicMock()
119119
mock_connection_factory = MagicMock()
120120
mock_connection_factory.create_cursor.return_value.__enter__.return_value = (
@@ -126,7 +126,7 @@ def test_execute_queries_parallel(
126126

127127
execute_queries_parallel(queries, mock_connection_factory)
128128

129-
mock_process_constructor.assert_called_once_with(
129+
mock_thread_constructor.assert_called_once_with(
130130
target=mock_execute_table_query,
131131
args=(
132132
"Why does dragons like the sun? Because they are scared of (k)nights.",
@@ -135,4 +135,3 @@ def test_execute_queries_parallel(
135135
)
136136
mock_process.start.assert_called_once()
137137
mock_process.join.assert_called_once()
138-
mock_process.terminate.assert_called_once()

0 commit comments

Comments
 (0)