Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed src/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions src/jobflow_remote/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,13 @@
from jobflow_remote.jobs.submit import submit_flow

SETTINGS = JobflowRemoteSettings()

__all__ = (
"__version__",
"set_run_config",
"ConfigManager",
"JobflowRemoteSettings",
"JobController",
"get_jobstore",
"submit_flow",
)
6 changes: 3 additions & 3 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def flows_list(
hours: hours_opt = None,
verbosity: verbosity_opt = 0,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON.value,
sort: sort_opt = SortOption.UPDATED_ON.value, # type: ignore[assignment]
reverse_sort: reverse_sort_flag_opt = False,
):
"""
Expand All @@ -72,7 +72,7 @@ def flows_list(

start_date = get_start_date(start_date, days, hours)

sort = [(sort.value, 1 if reverse_sort else -1)]
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]

with loading_spinner():
flows_info = jc.get_flows_info(
Expand All @@ -84,7 +84,7 @@ def flows_list(
end_date=end_date,
name=name,
limit=max_results,
sort=sort,
sort=db_sort,
full=verbosity > 0,
)

Expand Down
21 changes: 11 additions & 10 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
from pathlib import Path
from typing import Annotated, Optional
from typing import Annotated

import typer
from monty.json import jsanitize
Expand All @@ -13,6 +13,7 @@
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
OptionalStr,
break_lock_opt,
days_opt,
db_ids_opt,
Expand Down Expand Up @@ -74,7 +75,7 @@ def jobs_list(
hours: hours_opt = None,
verbosity: verbosity_opt = 0,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON.value,
sort: sort_opt = SortOption.UPDATED_ON.value, # type: ignore[assignment]
reverse_sort: reverse_sort_flag_opt = False,
locked: locked_opt = False,
custom_query: query_opt = None,
Expand All @@ -92,14 +93,14 @@ def jobs_list(

start_date = get_start_date(start_date, days, hours)

sort = [(sort.value, 1 if reverse_sort else -1)]
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]

with loading_spinner():
if custom_query:
jobs_info = jc.get_jobs_info_query(
query=custom_query,
limit=max_results,
sort=sort,
sort=db_sort,
)
else:
jobs_info = jc.get_jobs_info(
Expand All @@ -113,7 +114,7 @@ def jobs_list(
name=name,
metadata=metadata_dict,
limit=max_results,
sort=sort,
sort=db_sort,
)

table = get_job_info_table(jobs_info, verbosity=verbosity)
Expand Down Expand Up @@ -685,7 +686,7 @@ def resources(
typer.Option(
"--qresources",
"-qr",
help="If present the values will be interpreted as arguments for a QResources object",
help="If present the values in `resources_value` will be interpreted as arguments for a QResources object",
),
] = False,
job_id: job_ids_indexes_opt = None,
Expand All @@ -705,10 +706,10 @@ def resources(
Set the resources for the selected Jobs. Only READY or WAITING Jobs.
"""

resources_value = str_to_dict(resources_value)
resources = str_to_dict(resources_value)

if qresources:
resources_value = QResources(**resources_value)
resources = QResources(**resources)

jc = get_job_controller()
execute_multi_jobs_cmd(
Expand All @@ -728,7 +729,7 @@ def resources(
hours=hours,
verbosity=verbosity,
raise_on_error=raise_on_error,
resources=resources_value,
resources=resources,
update=not replace,
)

Expand Down Expand Up @@ -791,7 +792,7 @@ def output(
job_db_id: job_db_id_arg,
job_index: job_index_arg = None,
file_path: Annotated[
Optional[str],
OptionalStr,
typer.Option(
"--path",
"-p",
Expand Down
5 changes: 3 additions & 2 deletions src/jobflow_remote/cli/project.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable
from typing import Annotated

import typer
Expand Down Expand Up @@ -96,7 +97,7 @@ def current_project(ctx: typer.Context):
@app_project.command()
def generate(
name: Annotated[str, typer.Argument(help="Name of the project")],
file_format: serialize_file_format_opt = SerializeFileFormat.YAML.value,
file_format: serialize_file_format_opt = SerializeFileFormat.YAML.value, # type: ignore[assignment]
full: Annotated[
bool,
typer.Option(
Expand Down Expand Up @@ -171,7 +172,7 @@ def check(

check_all = all(not v for v in (jobstore, worker, queue))

workers_to_test = []
workers_to_test: Iterable[str] = []
if check_all:
workers_to_test = project.workers.keys()
elif worker:
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

@app_runner.command()
def run(
log_level: log_level_opt = LogLevel.INFO.value,
log_level: log_level_opt = LogLevel.INFO.value, # type: ignore[assignment]
set_pid: Annotated[
bool,
typer.Option(
Expand Down Expand Up @@ -108,7 +108,7 @@ def start(
help="Use a single process for the runner",
),
] = False,
log_level: log_level_opt = LogLevel.INFO.value,
log_level: log_level_opt = LogLevel.INFO.value, # type: ignore[assignment]
):
"""
Start the Runner as a daemon
Expand Down
10 changes: 9 additions & 1 deletion src/jobflow_remote/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ class DictType(dict):
pass


# Similarly, Python 3.10 union types are not supported,
# e.g., `str | None` vs `Optional[str]`
# pyupgraade really likes enforcing this rule (PEP 604)
# but will leave things in if they are explicit type aliases
OptionalStr = Optional[str]
OptionalDictType = Optional[DictType]


class DictTypeParser(click.ParamType):
name = "DictType"

Expand All @@ -316,7 +324,7 @@ def convert(self, value, param, ctx):


query_opt = Annotated[
Optional[DictType],
OptionalDictType,
typer.Option(
"--query",
"-q",
Expand Down
10 changes: 10 additions & 0 deletions src/jobflow_remote/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
RunnerOptions,
)
from jobflow_remote.config.manager import ConfigManager, ProjectData

__all__ = (
"ConfigError",
"ConfigManager",
"LocalWorker",
"Project",
"ProjectData",
"RemoteWorker",
"RunnerOptions",
)
3 changes: 3 additions & 0 deletions src/jobflow_remote/config/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def generate_dummy_worker(
)
return RemoteWorker(**d)

else:
raise ValueError(f"Unknown/unhandled host type: {host_type}")


def generate_dummy_jobstore() -> dict:
jobstore_dict = {
Expand Down
8 changes: 4 additions & 4 deletions src/jobflow_remote/jobs/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ def get_terminated(self) -> list[tuple[str, int, str]]:
"""
terminated = []
for i in self.host.listdir(self.terminated_dir):
job_id, index, process_uuid = i.split("_")
index = int(index)
job_id, _index, process_uuid = i.split("_")
index = int(_index)
terminated.append((job_id, index, process_uuid))
return terminated

def get_running(self) -> list[tuple[str, int, str]]:
running = []
for filename in self.host.listdir(self.running_dir):
job_id, index, process_uuid = filename.split("_")
index = int(index)
job_id, _index, process_uuid = filename.split("_")
index = int(_index)
running.append((job_id, index, process_uuid))
return running

Expand Down
34 changes: 27 additions & 7 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def get_jobs_info(
name: str | None = None,
metadata: dict | None = None,
locked: bool = False,
sort: list[tuple] | None = None,
sort: list[tuple[str, int]] | None = None,
limit: int = 0,
) -> list[JobInfo]:
"""
Expand Down Expand Up @@ -915,7 +915,7 @@ def _full_rerun(
"""
job_id = doc["uuid"]
job_index = doc["index"]
modified_jobs = []
modified_jobs: list[int] = []

flow_filter = {"jobs": job_id}
with self.lock_flow(
Expand Down Expand Up @@ -1546,6 +1546,9 @@ def stop_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")

job_state = JobState(job_doc["state"])
if job_state in [JobState.SUBMITTED.value, JobState.RUNNING.value]:
# try cancelling the job submitted to the remote queue
Expand All @@ -1559,14 +1562,20 @@ def stop_job(
job_id = job_doc["uuid"]
job_index = job_doc["index"]
updated_states = {job_id: {job_index: JobState.USER_STOPPED}}
if flow_lock.locked_document is None:
raise RuntimeError("No document found in flow lock")
self.update_flow_state(
flow_uuid=flow_lock.locked_document["uuid"],
updated_states=updated_states,
)
job_lock.update_on_release = {
"$set": {"state": JobState.USER_STOPPED.value}
}
return [job_lock.locked_document["db_id"]]
return_doc = job_lock.locked_document
if return_doc is None:
raise RuntimeError("No document found in final job lock")

return [return_doc["db_id"]]

def pause_job(
self,
Expand Down Expand Up @@ -1612,15 +1621,24 @@ def pause_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")
job_id = job_doc["uuid"]
job_index = job_doc["index"]
updated_states = {job_id: {job_index: JobState.PAUSED}}
flow_doc = flow_lock.locked_document
if flow_doc is None:
raise RuntimeError("No flow document found in lock")
self.update_flow_state(
flow_uuid=flow_lock.locked_document["uuid"],
flow_uuid=flow_doc["uuid"],
updated_states=updated_states,
)
job_lock.update_on_release = {"$set": {"state": JobState.PAUSED.value}}
return [job_lock.locked_document["db_id"]]
return_doc = job_lock.locked_document
if return_doc is None:
raise RuntimeError("No document found in final job lock")

return [return_doc["db_id"]]

def play_jobs(
self,
Expand Down Expand Up @@ -1745,6 +1763,8 @@ def play_job(
flow_lock_kwargs=flow_lock_kwargs,
) as (job_lock, flow_lock):
job_doc = job_lock.locked_document
if job_doc is None:
raise RuntimeError("No job document found in lock")
job_id = job_doc["uuid"]
job_index = job_doc["index"]
on_missing = job_doc["job"]["config"]["on_missing_references"]
Expand Down Expand Up @@ -1836,7 +1856,7 @@ def set_job_run_properties(
list
List of db_ids of the updated Jobs.
"""
set_dict = {}
set_dict: dict[str, Any] = {}
if worker:
if worker not in self.project.workers:
raise ValueError(f"worker {worker} is not present in the project")
Expand All @@ -1851,7 +1871,7 @@ def set_job_run_properties(
f"exec_config {exec_config} is not present in the project"
)
elif isinstance(exec_config, ExecutionConfig):
exec_config = exec_config.dict()
exec_config = exec_config.model_dump()

if update and isinstance(exec_config, dict):
for k, v in exec_config.items():
Expand Down
4 changes: 2 additions & 2 deletions src/jobflow_remote/jobs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ def run_batch_jobs(
else:
wait = 0
count += 1
job_id, index = job_str.split("_")
index = int(index)
job_id, _index = job_str.split("_")
index: int = int(_index)
logger.info(f"Starting job with id {job_id} and index {index}")
job_path = get_job_path(job_id=job_id, index=index, base_path=base_run_dir)
try:
Expand Down
6 changes: 6 additions & 0 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ def upload(self, lock: MongoLock):
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")

db_id = doc["db_id"]
logger.debug(f"upload db_id: {db_id}")

Expand Down Expand Up @@ -449,6 +452,9 @@ def submit(self, lock: MongoLock):
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")

logger.debug(f"submit db_id: {doc['db_id']}")

job_dict = doc["job"]
Expand Down
2 changes: 2 additions & 0 deletions src/jobflow_remote/remote/host/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from jobflow_remote.remote.host.base import BaseHost
from jobflow_remote.remote.host.local import LocalHost
from jobflow_remote.remote.host.remote import RemoteHost

__all__ = ("BaseHost", "LocalHost", "RemoteHost")