Skip to content

Commit

Permalink
Changing TaskQueue constructor to accept the kill_switch threading.Ev…
Browse files Browse the repository at this point in the history
…ent as an argument rather than an entire RuntimeContext, per @mr-c
  • Loading branch information
AlexTate authored and mr-c committed Nov 13, 2024
1 parent 20fd703 commit e8f9284
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
7 changes: 6 additions & 1 deletion cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def check_for_abstract_op(tool: CWLObjectType) -> None:
runtime_context.mutation_manager = MutationManager()
runtime_context.toplevel = True
runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
runtime_context.kill_switch = threading.Event()

job_reqs: Optional[list[CWLObjectType]] = None
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
Expand Down Expand Up @@ -439,9 +440,13 @@ def run_jobs(
logger: logging.Logger,
runtime_context: RuntimeContext,
) -> None:
if runtime_context.kill_switch is None:
runtime_context.kill_switch = threading.Event()

Check warning on line 444 in cwltool/executors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/executors.py#L444

Added line #L444 was not covered by tests

self.taskqueue: TaskQueue = TaskQueue(
threading.Lock(), int(math.ceil(self.max_cores)), runtime_context
threading.Lock(), int(math.ceil(self.max_cores)), runtime_context.kill_switch
)

try:
jobiter = process.job(job_order_object, self.output_callback, runtime_context)

Expand Down
9 changes: 2 additions & 7 deletions cwltool/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import threading
from typing import Callable, Optional

from .context import RuntimeContext
from .errors import WorkflowKillSwitch
from .loghandler import _logger

Expand Down Expand Up @@ -35,7 +34,7 @@ class TaskQueue:
in_flight: int = 0
"""The number of tasks in the queue."""

def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: RuntimeContext):
def __init__(self, lock: threading.Lock, thread_count: int, kill_switch: threading.Event):
"""Create a new task queue using the specified lock and number of threads."""
self.thread_count = thread_count
self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue(
Expand All @@ -44,11 +43,7 @@ def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: Run
self.task_queue_threads = []
self.lock = lock
self.error: Optional[BaseException] = None

if runtime_context.kill_switch is None:
self.kill_switch = runtime_context.kill_switch = threading.Event()
else:
self.kill_switch = runtime_context.kill_switch
self.kill_switch = kill_switch

for _r in range(0, self.thread_count):
t = threading.Thread(target=self._task_queue_func)
Expand Down

0 comments on commit e8f9284

Please sign in to comment.