diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/jobflow_remote/__init__.py b/src/jobflow_remote/__init__.py index dbb67772..bc641ff2 100644 --- a/src/jobflow_remote/__init__.py +++ b/src/jobflow_remote/__init__.py @@ -9,3 +9,13 @@ from jobflow_remote.jobs.submit import submit_flow SETTINGS = JobflowRemoteSettings() + +__all__ = ( + "__version__", + "set_run_config", + "ConfigManager", + "JobflowRemoteSettings", + "JobController", + "get_jobstore", + "submit_flow", +) diff --git a/src/jobflow_remote/cli/flow.py b/src/jobflow_remote/cli/flow.py index 3bdae217..1186cc3f 100644 --- a/src/jobflow_remote/cli/flow.py +++ b/src/jobflow_remote/cli/flow.py @@ -59,7 +59,7 @@ def flows_list( hours: hours_opt = None, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, - sort: sort_opt = SortOption.UPDATED_ON.value, + sort: sort_opt = SortOption.UPDATED_ON.value, # type: ignore[assignment] reverse_sort: reverse_sort_flag_opt = False, ): """ @@ -72,7 +72,7 @@ def flows_list( start_date = get_start_date(start_date, days, hours) - sort = [(sort.value, 1 if reverse_sort else -1)] + db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)] with loading_spinner(): flows_info = jc.get_flows_info( @@ -84,7 +84,7 @@ def flows_list( end_date=end_date, name=name, limit=max_results, - sort=sort, + sort=db_sort, full=verbosity > 0, ) diff --git a/src/jobflow_remote/cli/job.py b/src/jobflow_remote/cli/job.py index 5abd58a8..04315986 100644 --- a/src/jobflow_remote/cli/job.py +++ b/src/jobflow_remote/cli/job.py @@ -1,6 +1,6 @@ import io from pathlib import Path -from typing import Annotated, Optional +from typing import Annotated import typer from monty.json import jsanitize @@ -13,6 +13,7 @@ from jobflow_remote.cli.jf import app from jobflow_remote.cli.jfr_typer import JFRTyper from jobflow_remote.cli.types import ( + OptionalStr, break_lock_opt, days_opt, db_ids_opt, @@ -74,7 +75,7 @@ def jobs_list( hours: hours_opt = None, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, - sort: sort_opt = SortOption.UPDATED_ON.value, + sort: sort_opt = SortOption.UPDATED_ON.value, # type: ignore[assignment] reverse_sort: reverse_sort_flag_opt = False, locked: locked_opt = False, custom_query: query_opt = None, @@ -92,14 +93,14 @@ def jobs_list( start_date = get_start_date(start_date, days, hours) - sort = [(sort.value, 1 if reverse_sort else -1)] + db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)] with loading_spinner(): if custom_query: jobs_info = jc.get_jobs_info_query( query=custom_query, limit=max_results, - sort=sort, + sort=db_sort, ) else: jobs_info = jc.get_jobs_info( @@ -113,7 +114,7 @@ def jobs_list( name=name, metadata=metadata_dict, limit=max_results, - sort=sort, + sort=db_sort, ) table = get_job_info_table(jobs_info, verbosity=verbosity) @@ -685,7 +686,7 @@ def resources( typer.Option( "--qresources", "-qr", - help="If present the values will be interpreted as arguments for a QResources object", + help="If present the values in `resources_value` will be interpreted as arguments for a QResources object", ), ] = False, job_id: job_ids_indexes_opt = None, @@ -705,10 +706,10 @@ def resources( Set the resources for the selected Jobs. Only READY or WAITING Jobs. """ - resources_value = str_to_dict(resources_value) + resources = str_to_dict(resources_value) if qresources: - resources_value = QResources(**resources_value) + resources = QResources(**resources) jc = get_job_controller() execute_multi_jobs_cmd( @@ -728,7 +729,7 @@ def resources( hours=hours, verbosity=verbosity, raise_on_error=raise_on_error, - resources=resources_value, + resources=resources, update=not replace, ) @@ -791,7 +792,7 @@ def output( job_db_id: job_db_id_arg, job_index: job_index_arg = None, file_path: Annotated[ - Optional[str], + OptionalStr, typer.Option( "--path", "-p", diff --git a/src/jobflow_remote/cli/project.py b/src/jobflow_remote/cli/project.py index edada488..11f6084a 100644 --- a/src/jobflow_remote/cli/project.py +++ b/src/jobflow_remote/cli/project.py @@ -1,3 +1,4 @@ +from collections.abc import Iterable from typing import Annotated import typer @@ -96,7 +97,7 @@ def current_project(ctx: typer.Context): @app_project.command() def generate( name: Annotated[str, typer.Argument(help="Name of the project")], - file_format: serialize_file_format_opt = SerializeFileFormat.YAML.value, + file_format: serialize_file_format_opt = SerializeFileFormat.YAML.value, # type: ignore[assignment] full: Annotated[ bool, typer.Option( @@ -171,7 +172,7 @@ def check( check_all = all(not v for v in (jobstore, worker, queue)) - workers_to_test = [] + workers_to_test: Iterable[str] = [] if check_all: workers_to_test = project.workers.keys() elif worker: diff --git a/src/jobflow_remote/cli/runner.py b/src/jobflow_remote/cli/runner.py index 4db0ab6c..b3db8654 100644 --- a/src/jobflow_remote/cli/runner.py +++ b/src/jobflow_remote/cli/runner.py @@ -27,7 +27,7 @@ @app_runner.command() def run( - log_level: log_level_opt = LogLevel.INFO.value, + log_level: log_level_opt = LogLevel.INFO.value, # type: ignore[assignment] set_pid: Annotated[ bool, typer.Option( @@ -108,7 +108,7 @@ def start( help="Use a single process for the runner", ), ] = False, - log_level: log_level_opt = LogLevel.INFO.value, + log_level: log_level_opt = LogLevel.INFO.value, # type: ignore[assignment] ): """ Start the Runner as a daemon diff --git a/src/jobflow_remote/cli/types.py b/src/jobflow_remote/cli/types.py index 4ca5af79..ec13844c 100644 --- a/src/jobflow_remote/cli/types.py +++ b/src/jobflow_remote/cli/types.py @@ -302,6 +302,14 @@ class DictType(dict): pass +# Similarly, Python 3.10 union types are not supported, +# e.g., `str | None` vs `Optional[str]` +# pyupgraade really likes enforcing this rule (PEP 604) +# but will leave things in if they are explicit type aliases +OptionalStr = Optional[str] +OptionalDictType = Optional[DictType] + + class DictTypeParser(click.ParamType): name = "DictType" @@ -316,7 +324,7 @@ def convert(self, value, param, ctx): query_opt = Annotated[ - Optional[DictType], + OptionalDictType, typer.Option( "--query", "-q", diff --git a/src/jobflow_remote/config/__init__.py b/src/jobflow_remote/config/__init__.py index 974fc8dc..cc34a429 100644 --- a/src/jobflow_remote/config/__init__.py +++ b/src/jobflow_remote/config/__init__.py @@ -6,3 +6,13 @@ RunnerOptions, ) from jobflow_remote.config.manager import ConfigManager, ProjectData + +__all__ = ( + "ConfigError", + "ConfigManager", + "LocalWorker", + "Project", + "ProjectData", + "RemoteWorker", + "RunnerOptions", +) diff --git a/src/jobflow_remote/config/helper.py b/src/jobflow_remote/config/helper.py index 68dd6863..f0e755d4 100644 --- a/src/jobflow_remote/config/helper.py +++ b/src/jobflow_remote/config/helper.py @@ -65,6 +65,9 @@ def generate_dummy_worker( ) return RemoteWorker(**d) + else: + raise ValueError(f"Unknown/unhandled host type: {host_type}") + def generate_dummy_jobstore() -> dict: jobstore_dict = { diff --git a/src/jobflow_remote/jobs/batch.py b/src/jobflow_remote/jobs/batch.py index 67bb7ed1..8ab0f085 100644 --- a/src/jobflow_remote/jobs/batch.py +++ b/src/jobflow_remote/jobs/batch.py @@ -101,16 +101,16 @@ def get_terminated(self) -> list[tuple[str, int, str]]: """ terminated = [] for i in self.host.listdir(self.terminated_dir): - job_id, index, process_uuid = i.split("_") - index = int(index) + job_id, _index, process_uuid = i.split("_") + index = int(_index) terminated.append((job_id, index, process_uuid)) return terminated def get_running(self) -> list[tuple[str, int, str]]: running = [] for filename in self.host.listdir(self.running_dir): - job_id, index, process_uuid = filename.split("_") - index = int(index) + job_id, _index, process_uuid = filename.split("_") + index = int(_index) running.append((job_id, index, process_uuid)) return running diff --git a/src/jobflow_remote/jobs/jobcontroller.py b/src/jobflow_remote/jobs/jobcontroller.py index 5ed1a5f3..c4ce45e4 100644 --- a/src/jobflow_remote/jobs/jobcontroller.py +++ b/src/jobflow_remote/jobs/jobcontroller.py @@ -375,7 +375,7 @@ def get_jobs_info( name: str | None = None, metadata: dict | None = None, locked: bool = False, - sort: list[tuple] | None = None, + sort: list[tuple[str, int]] | None = None, limit: int = 0, ) -> list[JobInfo]: """ @@ -915,7 +915,7 @@ def _full_rerun( """ job_id = doc["uuid"] job_index = doc["index"] - modified_jobs = [] + modified_jobs: list[int] = [] flow_filter = {"jobs": job_id} with self.lock_flow( @@ -1546,6 +1546,9 @@ def stop_job( flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document + if job_doc is None: + raise RuntimeError("No job document found in lock") + job_state = JobState(job_doc["state"]) if job_state in [JobState.SUBMITTED.value, JobState.RUNNING.value]: # try cancelling the job submitted to the remote queue @@ -1559,6 +1562,8 @@ def stop_job( job_id = job_doc["uuid"] job_index = job_doc["index"] updated_states = {job_id: {job_index: JobState.USER_STOPPED}} + if flow_lock.locked_document is None: + raise RuntimeError("No document found in flow lock") self.update_flow_state( flow_uuid=flow_lock.locked_document["uuid"], updated_states=updated_states, @@ -1566,7 +1571,11 @@ def stop_job( job_lock.update_on_release = { "$set": {"state": JobState.USER_STOPPED.value} } - return [job_lock.locked_document["db_id"]] + return_doc = job_lock.locked_document + if return_doc is None: + raise RuntimeError("No document found in final job lock") + + return [return_doc["db_id"]] def pause_job( self, @@ -1612,15 +1621,24 @@ def pause_job( flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document + if job_doc is None: + raise RuntimeError("No job document found in lock") job_id = job_doc["uuid"] job_index = job_doc["index"] updated_states = {job_id: {job_index: JobState.PAUSED}} + flow_doc = flow_lock.locked_document + if flow_doc is None: + raise RuntimeError("No flow document found in lock") self.update_flow_state( - flow_uuid=flow_lock.locked_document["uuid"], + flow_uuid=flow_doc["uuid"], updated_states=updated_states, ) job_lock.update_on_release = {"$set": {"state": JobState.PAUSED.value}} - return [job_lock.locked_document["db_id"]] + return_doc = job_lock.locked_document + if return_doc is None: + raise RuntimeError("No document found in final job lock") + + return [return_doc["db_id"]] def play_jobs( self, @@ -1745,6 +1763,8 @@ def play_job( flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document + if job_doc is None: + raise RuntimeError("No job document found in lock") job_id = job_doc["uuid"] job_index = job_doc["index"] on_missing = job_doc["job"]["config"]["on_missing_references"] @@ -1836,7 +1856,7 @@ def set_job_run_properties( list List of db_ids of the updated Jobs. """ - set_dict = {} + set_dict: dict[str, Any] = {} if worker: if worker not in self.project.workers: raise ValueError(f"worker {worker} is not present in the project") @@ -1851,7 +1871,7 @@ def set_job_run_properties( f"exec_config {exec_config} is not present in the project" ) elif isinstance(exec_config, ExecutionConfig): - exec_config = exec_config.dict() + exec_config = exec_config.model_dump() if update and isinstance(exec_config, dict): for k, v in exec_config.items(): diff --git a/src/jobflow_remote/jobs/run.py b/src/jobflow_remote/jobs/run.py index 9f1282e6..5e121654 100644 --- a/src/jobflow_remote/jobs/run.py +++ b/src/jobflow_remote/jobs/run.py @@ -133,8 +133,8 @@ def run_batch_jobs( else: wait = 0 count += 1 - job_id, index = job_str.split("_") - index = int(index) + job_id, _index = job_str.split("_") + index: int = int(_index) logger.info(f"Starting job with id {job_id} and index {index}") job_path = get_job_path(job_id=job_id, index=index, base_path=base_run_dir) try: diff --git a/src/jobflow_remote/jobs/runner.py b/src/jobflow_remote/jobs/runner.py index 386752b3..6eb65b75 100644 --- a/src/jobflow_remote/jobs/runner.py +++ b/src/jobflow_remote/jobs/runner.py @@ -391,6 +391,9 @@ def upload(self, lock: MongoLock): The MongoLock with the locked Job document. """ doc = lock.locked_document + if doc is None: + raise RuntimeError("No document found in the lock.") + db_id = doc["db_id"] logger.debug(f"upload db_id: {db_id}") @@ -449,6 +452,9 @@ def submit(self, lock: MongoLock): The MongoLock with the locked Job document. """ doc = lock.locked_document + if doc is None: + raise RuntimeError("No document found in the lock.") + logger.debug(f"submit db_id: {doc['db_id']}") job_dict = doc["job"] diff --git a/src/jobflow_remote/remote/host/__init__.py b/src/jobflow_remote/remote/host/__init__.py index 67c7c21c..a1a40f78 100644 --- a/src/jobflow_remote/remote/host/__init__.py +++ b/src/jobflow_remote/remote/host/__init__.py @@ -1,3 +1,5 @@ from jobflow_remote.remote.host.base import BaseHost from jobflow_remote.remote.host.local import LocalHost from jobflow_remote.remote.host.remote import RemoteHost + +__all__ = ("BaseHost", "LocalHost", "RemoteHost")