Skip to content
Merged
Empty file removed src/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions src/jobflow_remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,13 @@
from jobflow_remote.jobs.submit import submit_flow

SETTINGS = JobflowRemoteSettings()

__all__ = (
"__version__",
"set_run_config",
"ConfigManager",
"JobflowRemoteSettings",
"JobController",
"get_jobstore",
"submit_flow",
)
6 changes: 3 additions & 3 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def flows_list(
hours: hours_opt = None,
verbosity: verbosity_opt = 0,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON.value,
sort: sort_opt = SortOption.UPDATED_ON,
reverse_sort: reverse_sort_flag_opt = False,
):
"""
Expand All @@ -72,7 +72,7 @@ def flows_list(

start_date = get_start_date(start_date, days, hours)

sort = [(sort.value, 1 if reverse_sort else -1)]
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]

with loading_spinner():
flows_info = jc.get_flows_info(
Expand All @@ -84,7 +84,7 @@ def flows_list(
end_date=end_date,
name=name,
limit=max_results,
sort=sort,
sort=db_sort,
full=verbosity > 0,
)

Expand Down
20 changes: 11 additions & 9 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import io
from pathlib import Path
from typing import Annotated, Optional
from typing import Annotated

import typer
from monty.json import jsanitize
Expand Down Expand Up @@ -74,7 +76,7 @@ def jobs_list(
hours: hours_opt = None,
verbosity: verbosity_opt = 0,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON.value,
sort: sort_opt = SortOption.UPDATED_ON,
reverse_sort: reverse_sort_flag_opt = False,
locked: locked_opt = False,
custom_query: query_opt = None,
Expand All @@ -92,14 +94,14 @@ def jobs_list(

start_date = get_start_date(start_date, days, hours)

sort = [(sort.value, 1 if reverse_sort else -1)]
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]

with loading_spinner():
if custom_query:
jobs_info = jc.get_jobs_info_query(
query=custom_query,
limit=max_results,
sort=sort,
sort=db_sort,
)
else:
jobs_info = jc.get_jobs_info(
Expand All @@ -113,7 +115,7 @@ def jobs_list(
name=name,
metadata=metadata_dict,
limit=max_results,
sort=sort,
sort=db_sort,
)

table = get_job_info_table(jobs_info, verbosity=verbosity)
Expand Down Expand Up @@ -685,7 +687,7 @@ def resources(
typer.Option(
"--qresources",
"-qr",
help="If present the values will be interpreted as arguments for a QResources object",
help="If present the values in `resources_value` will be interpreted as arguments for a QResources object",
),
] = False,
job_id: job_ids_indexes_opt = None,
Expand All @@ -705,10 +707,10 @@ def resources(
Set the resources for the selected Jobs. Only READY or WAITING Jobs.
"""

resources_value = str_to_dict(resources_value)
resources: dict | QResources = str_to_dict(resources_value)

if qresources:
resources_value = QResources(**resources_value)
resources = QResources(**resources)

jc = get_job_controller()
execute_multi_jobs_cmd(
Expand Down Expand Up @@ -791,7 +793,7 @@ def output(
job_db_id: job_db_id_arg,
job_index: job_index_arg = None,
file_path: Annotated[
Optional[str],
str | None,
typer.Option(
"--path",
"-p",
Expand Down
5 changes: 3 additions & 2 deletions src/jobflow_remote/cli/project.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable
from typing import Annotated

import typer
Expand Down Expand Up @@ -96,7 +97,7 @@ def current_project(ctx: typer.Context):
@app_project.command()
def generate(
name: Annotated[str, typer.Argument(help="Name of the project")],
file_format: serialize_file_format_opt = SerializeFileFormat.YAML.value,
file_format: serialize_file_format_opt = SerializeFileFormat.YAML,
full: Annotated[
bool,
typer.Option(
Expand Down Expand Up @@ -171,7 +172,7 @@ def check(

check_all = all(not v for v in (jobstore, worker, queue))

workers_to_test = []
workers_to_test: Iterable[str] = []
if check_all:
workers_to_test = project.workers.keys()
elif worker:
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

@app_runner.command()
def run(
log_level: log_level_opt = LogLevel.INFO.value,
log_level: log_level_opt = LogLevel.INFO,
set_pid: Annotated[
bool,
typer.Option(
Expand Down Expand Up @@ -108,7 +108,7 @@ def start(
help="Use a single process for the runner",
),
] = False,
log_level: log_level_opt = LogLevel.INFO.value,
log_level: log_level_opt = LogLevel.INFO,
):
"""
Start the Runner as a daemon
Expand Down
10 changes: 10 additions & 0 deletions src/jobflow_remote/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
RunnerOptions,
)
from jobflow_remote.config.manager import ConfigManager, ProjectData

__all__ = (
"ConfigError",
"ConfigManager",
"LocalWorker",
"Project",
"ProjectData",
"RemoteWorker",
"RunnerOptions",
)
3 changes: 3 additions & 0 deletions src/jobflow_remote/config/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def generate_dummy_worker(
)
return RemoteWorker(**d)

else:
raise ValueError(f"Unknown/unhandled host type: {host_type}")


def generate_dummy_jobstore() -> dict:
jobstore_dict = {
Expand Down
8 changes: 4 additions & 4 deletions src/jobflow_remote/jobs/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ def get_terminated(self) -> list[tuple[str, int, str]]:
"""
terminated = []
for i in self.host.listdir(self.terminated_dir):
job_id, index, process_uuid = i.split("_")
index = int(index)
job_id, _index, process_uuid = i.split("_")
index = int(_index)
terminated.append((job_id, index, process_uuid))
return terminated

def get_running(self) -> list[tuple[str, int, str]]:
running = []
for filename in self.host.listdir(self.running_dir):
job_id, index, process_uuid = filename.split("_")
index = int(index)
job_id, _index, process_uuid = filename.split("_")
index = int(_index)
running.append((job_id, index, process_uuid))
return running

Expand Down
34 changes: 27 additions & 7 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def get_jobs_info(
name: str | None = None,
metadata: dict | None = None,
locked: bool = False,
sort: list[tuple] | None = None,
sort: list[tuple[str, int]] | None = None,
limit: int = 0,
) -> list[JobInfo]:
"""
Expand Down Expand Up @@ -915,7 +915,7 @@ def _full_rerun(
"""
job_id = doc["uuid"]
job_index = doc["index"]
modified_jobs = []
modified_jobs: list[int] = []

flow_filter = {"jobs": job_id}
with self.lock_flow(
Expand Down Expand Up @@ -1546,6 +1546,9 @@ def stop_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")

job_state = JobState(job_doc["state"])
if job_state in [JobState.SUBMITTED.value, JobState.RUNNING.value]:
# try cancelling the job submitted to the remote queue
Expand All @@ -1559,14 +1562,20 @@ def stop_job(
job_id = job_doc["uuid"]
job_index = job_doc["index"]
updated_states = {job_id: {job_index: JobState.USER_STOPPED}}
if flow_lock.locked_document is None:
raise RuntimeError("No document found in flow lock")
self.update_flow_state(
flow_uuid=flow_lock.locked_document["uuid"],
updated_states=updated_states,
)
job_lock.update_on_release = {
"$set": {"state": JobState.USER_STOPPED.value}
}
return [job_lock.locked_document["db_id"]]
return_doc = job_lock.locked_document
if return_doc is None:
raise RuntimeError("No document found in final job lock")

return [return_doc["db_id"]]

def pause_job(
self,
Expand Down Expand Up @@ -1612,15 +1621,24 @@ def pause_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")
job_id = job_doc["uuid"]
job_index = job_doc["index"]
updated_states = {job_id: {job_index: JobState.PAUSED}}
flow_doc = flow_lock.locked_document
if flow_doc is None:
raise RuntimeError("No flow document found in lock")
self.update_flow_state(
flow_uuid=flow_lock.locked_document["uuid"],
flow_uuid=flow_doc["uuid"],
updated_states=updated_states,
)
job_lock.update_on_release = {"$set": {"state": JobState.PAUSED.value}}
return [job_lock.locked_document["db_id"]]
return_doc = job_lock.locked_document
if return_doc is None:
raise RuntimeError("No document found in final job lock")

return [return_doc["db_id"]]

def play_jobs(
self,
Expand Down Expand Up @@ -1745,6 +1763,8 @@ def play_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")
job_id = job_doc["uuid"]
job_index = job_doc["index"]
on_missing = job_doc["job"]["config"]["on_missing_references"]
Expand Down Expand Up @@ -1836,7 +1856,7 @@ def set_job_run_properties(
list
List of db_ids of the updated Jobs.
"""
set_dict = {}
set_dict: dict[str, Any] = {}
if worker:
if worker not in self.project.workers:
raise ValueError(f"worker {worker} is not present in the project")
Expand All @@ -1851,7 +1871,7 @@ def set_job_run_properties(
f"exec_config {exec_config} is not present in the project"
)
elif isinstance(exec_config, ExecutionConfig):
exec_config = exec_config.dict()
exec_config = exec_config.model_dump()

if update and isinstance(exec_config, dict):
for k, v in exec_config.items():
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/jobs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ def run_batch_jobs(
else:
wait = 0
count += 1
job_id, index = job_str.split("_")
index = int(index)
job_id, _index = job_str.split("_")
index: int = int(_index)
logger.info(f"Starting job with id {job_id} and index {index}")
job_path = get_job_path(job_id=job_id, index=index, base_path=base_run_dir)
try:
Expand Down
6 changes: 6 additions & 0 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ def upload(self, lock: MongoLock):
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")

db_id = doc["db_id"]
logger.debug(f"upload db_id: {db_id}")

Expand Down Expand Up @@ -449,6 +452,9 @@ def submit(self, lock: MongoLock):
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")

logger.debug(f"submit db_id: {doc['db_id']}")

job_dict = doc["job"]
Expand Down
2 changes: 2 additions & 0 deletions src/jobflow_remote/remote/host/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from jobflow_remote.remote.host.base import BaseHost
from jobflow_remote.remote.host.local import LocalHost
from jobflow_remote.remote.host.remote import RemoteHost

__all__ = ("BaseHost", "LocalHost", "RemoteHost")