Skip to content

Commit

Permalink
Validate queue options
Browse files Browse the repository at this point in the history
If the config contains queuesystem specific queue options that are unknown
to the driver, a ConfigValidationError will be raised.
  • Loading branch information
berland committed Aug 23, 2023
1 parent 7f4108a commit d47dd6c
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 52 deletions.
7 changes: 4 additions & 3 deletions src/clib/lib/include/ert/job_queue/lsf_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ typedef struct lsf_driver_struct lsf_driver_type;
typedef struct lsf_job_struct lsf_job_type;

const std::vector<std::string> LSF_DRIVER_OPTIONS = {
LSF_QUEUE, LSF_RESOURCE, LSF_SERVER, LSF_RSH_CMD,
LSF_LOGIN_SHELL, LSF_BSUB_CMD, LSF_BJOBS_CMD, LSF_BKILL_CMD,
LSF_BHIST_CMD, LSF_BJOBS_TIMEOUT};
LSF_QUEUE, LSF_RESOURCE, LSF_SERVER, LSF_RSH_CMD,
LSF_LOGIN_SHELL, LSF_BSUB_CMD, LSF_BJOBS_CMD, LSF_BKILL_CMD,
LSF_BHIST_CMD, LSF_BJOBS_TIMEOUT, LSF_DEBUG_OUTPUT, LSF_SUBMIT_SLEEP,
LSF_EXCLUDE_HOST, LSF_PROJECT_CODE};

void lsf_job_free(lsf_job_type *job);

Expand Down
5 changes: 5 additions & 0 deletions src/clib/lib/job_queue/lsf_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ert/job_queue/lsf_job_stat.hpp>
#include <ert/job_queue/queue_driver.hpp>
#include <ert/job_queue/spawn.hpp>
#include <ert/python.hpp>

namespace fs = std::filesystem;
static auto logger = ert::get_logger("job_queue.lsf_driver");
Expand Down Expand Up @@ -1039,3 +1040,7 @@ void *lsf_driver_alloc() {
lsf_driver_set_option(lsf_driver, LSF_BJOBS_TIMEOUT, BJOBS_REFRESH_TIME);
return lsf_driver;
}

ERT_CLIB_SUBMODULE("lsf_driver", m) {
m.add_object("LSF_DRIVER_OPTIONS", py::cast(LSF_DRIVER_OPTIONS));
}
5 changes: 5 additions & 0 deletions src/clib/lib/job_queue/slurm_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ert/job_queue/queue_driver.hpp>
#include <ert/job_queue/slurm_driver.hpp>
#include <ert/job_queue/spawn.hpp>
#include <ert/python.hpp>

static auto logger = ert::get_logger("job_queue.slurm_driver");

Expand Down Expand Up @@ -585,3 +586,7 @@ void slurm_driver_free_job(void *__job) {
SlurmJob *job = static_cast<SlurmJob *>(__job);
delete job;
}

ERT_CLIB_SUBMODULE("slurm_driver", m) {
m.add_object("SLURM_DRIVER_OPTIONS", py::cast(SLURM_DRIVER_OPTIONS));
}
3 changes: 3 additions & 0 deletions src/clib/lib/job_queue/torque_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ ERT_CLIB_SUBMODULE("torque_driver", m) {
auto py_path = pathlib.attr("Path")(DEFAULT_QSTAT_CMD.c_str());
m.add_object("DEFAULT_QSTAT_CMD", py_path);

py::bind_vector<std::vector<std::string>>(m, "List");
m.add_object("TORQUE_DRIVER_OPTIONS", py::cast(TORQUE_DRIVER_OPTIONS));

py::enum_<job_status_type>(m, "JobStatusType", py::arithmetic())
.export_values();

Expand Down
33 changes: 26 additions & 7 deletions src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@
import shutil
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Union, no_type_check
from typing import Any, Dict, List, Tuple, Union, no_type_check

from ert import _clib

from .parsing import ConfigDict, ConfigValidationError, ErrorInfo
from .queue_system import QueueSystem

GENERIC_QUEUE_OPTIONS: List[str] = ["MAX_RUNNING"]
VALID_QUEUE_OPTIONS: Dict[Any, List[str]] = {
QueueSystem.TORQUE: _clib.torque_driver.TORQUE_DRIVER_OPTIONS
+ GENERIC_QUEUE_OPTIONS,
QueueSystem.LOCAL: [] + GENERIC_QUEUE_OPTIONS, # No specific options in driver
QueueSystem.SLURM: _clib.slurm_driver.SLURM_DRIVER_OPTIONS + GENERIC_QUEUE_OPTIONS,
QueueSystem.LSF: _clib.lsf_driver.LSF_DRIVER_OPTIONS + GENERIC_QUEUE_OPTIONS,
}


@dataclass
class QueueConfig:
Expand Down Expand Up @@ -62,7 +73,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
f"QUEUE_SYSTEM are {valid_queue_systems!r}"
)

queue_system = QueueSystem.from_string(queue_system)
selected_queue_system = QueueSystem.from_string(queue_system)
job_script: str = config_dict.get(
"JOB_SCRIPT", shutil.which("job_dispatch.py") or "job_dispatch.py"
)
Expand All @@ -72,16 +83,24 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
QueueSystem, List[Union[Tuple[str, str], str]]
] = defaultdict(list)
for system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
queue_driver_type = QueueSystem.from_string(system)
queue_system = QueueSystem.from_string(system)
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
raise ConfigValidationError(
f"Invalid QUEUE_OPTION for {queue_system.name}: '{option_name}'. "
f"Valid choices are {sorted(VALID_QUEUE_OPTIONS[queue_system])}."
)
if values:
queue_options[queue_driver_type].append((option_name, values[0]))
queue_options[queue_system].append((option_name, values[0]))
else:
queue_options[queue_driver_type].append(option_name)
queue_options[queue_system].append(option_name)

if queue_system == QueueSystem.TORQUE and queue_options[QueueSystem.TORQUE]:
if (
selected_queue_system == QueueSystem.TORQUE
and queue_options[QueueSystem.TORQUE]
):
_validate_torque_options(queue_options[QueueSystem.TORQUE])

return QueueConfig(job_script, max_submit, queue_system, queue_options)
return QueueConfig(job_script, max_submit, selected_queue_system, queue_options)

def create_local_copy(self) -> QueueConfig:
return QueueConfig(
Expand Down
54 changes: 12 additions & 42 deletions tests/unit_tests/config/config_dict_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from py import path as py_path
from pydantic import PositiveInt

from ert import _clib
from ert.config import QueueSystem
from ert.config.field import TRANSFORM_FUNCTIONS
from ert.config.parsing import ConfigKeys
Expand Down Expand Up @@ -85,56 +86,19 @@ def memory_with_unit(draw):
def valid_queue_options(queue_system: str):
valids = ["MAX_RUNNING"]
if queue_system == QueueSystem.LSF.name:
valids += [
"LSF_RESOURCE",
"LSF_SERVER",
"LSF_QUEUE",
"LSF_LOGIN_SHELL",
"LSF_RSH_CMD",
"BSUB_CMD",
"BJOBS_CMD",
"BKILL_CMD",
"BHIST_CMD",
"BJOBS_TIMEOUT",
"DEBUG_OUTPUT",
"EXCLUDE_HOST",
"PROJECT_CODE",
"SUBMIT_SLEEP",
]
valids += _clib.lsf_driver.LSF_DRIVER_OPTIONS
elif queue_system == QueueSystem.SLURM.name:
valids += [
"SBATCH",
"SCANCEL",
"SCONTROL",
"MEMORY",
"MEMORY_PER_CPU",
"EXCLUDE_HOST",
"INCLUDE_HOST",
"SQUEUE_TIMEOUT",
"MAX_RUNTIME",
]
valids += _clib.slurm_driver.SLURM_DRIVER_OPTIONS
elif queue_system == QueueSystem.TORQUE.name:
valids += [
"QSUB_CMD",
"QSTAT_CMD",
"QDEL_CMD",
"QUEUE",
"MEMORY_PER_JOB",
"NUM_CPUS_PER_NODE",
"NUM_NODES",
"KEEP_QSUB_OUTPUT",
"CLUSTER_LABEL",
"JOB_PREFIX",
"DEBUG_OUTPUT",
"SUBMIT_SLEEP",
]
valids += _clib.torque_driver.TORQUE_DRIVER_OPTIONS
return valids


def valid_queue_values(option_name):
if option_name in [
"QSUB_CMD",
"QSTAT_CMD",
"QSTAT_OPTIONS",
"QDEL_CMD",
"QUEUE",
"CLUSTER_LABEL",
Expand All @@ -151,10 +115,12 @@ def valid_queue_values(option_name):
"BHIST_CMD",
"BJOBS_TIMEOUT",
"EXCLUDE_HOST",
"PARTITION",
"PROJECT_CODE",
"SBATCH",
"SCANCEL",
"SCONTROL",
"SQUEUE",
"MEMORY",
"MEMORY_PER_CPU",
"EXCLUDE_HOST",
Expand All @@ -173,6 +139,7 @@ def valid_queue_values(option_name):
"NUM_NODES",
"MAX_RUNNING",
"MAX_RUNTIME",
"QUEUE_QUERY_TIMEOUT",
]:
return st.builds(str, positives)
if option_name in [
Expand All @@ -181,7 +148,10 @@ def valid_queue_values(option_name):
]:
return st.builds(str, st.booleans())
else:
raise ValueError(f"unknown option {option_name}")
raise ValueError(
"config_dict_generator does not know how to "
f"generate values for {option_name}"
)


@st.composite
Expand Down
18 changes: 18 additions & 0 deletions tests/unit_tests/config/test_queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ def test_queue_config_invalid_queue_system_provided(num_real):
_ = ErtConfig.from_file(filename)


@pytest.mark.usefixtures("use_tmpdir", "set_site_config")
@pytest.mark.parametrize(
"queue_system, invalid_option", [("LOCAL", "BSUB_CMD"), ("TORQUE", "BOGUS")]
)
def test_queue_config_invalid_queue_option_provided(queue_system, invalid_option):
filename = "config.ert"

with open(filename, "w", encoding="utf-8") as f:
f.write(f"NUM_REALIZATIONS 1\nQUEUE_SYSTEM {queue_system}\n")
f.write(f"QUEUE_OPTION {queue_system} {invalid_option}")

with pytest.raises(
expected_exception=ConfigValidationError,
match=f"Invalid QUEUE_OPTION for {queue_system}: '{invalid_option}'",
):
_ = ErtConfig.from_file(filename)


@pytest.mark.usefixtures("use_tmpdir", "set_site_config")
@given(memory_with_unit())
def test_torque_queue_config_memory_pr_job(memory_with_unit_str):
Expand Down

0 comments on commit d47dd6c

Please sign in to comment.