Skip to content

Commit

Permalink
[wip] add nested workflow test
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Nov 6, 2024
1 parent 471713f commit d66f07a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 24 deletions.
105 changes: 82 additions & 23 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class WorkflowProcesses(enum.Enum):
APP_ECHO = "Echo"
APP_ECHO_OPTIONAL = "EchoOptional"
APP_ECHO_SECRETS = "EchoSecrets"
APP_ECHO_RESULTS_TESTER = "EchoResultsTester"
APP_ICE_DAYS = "Finch_IceDays"
APP_READ_FILE = "ReadFile"
APP_SUBSET_BBOX = "ColibriFlyingpigeon_SubsetBbox"
Expand Down Expand Up @@ -166,7 +167,8 @@ def __init__(self,
application_package=None, # type: Optional[CWL]
): # type: (...) -> None
self.pid = WorkflowProcesses(process_id) # type: WorkflowProcesses
self.id = self.pid.value # type: Optional[str] # noqa
self.id = self.pid.value # type: str
self.path = f"/processes/{self.id}" # type: str
self.test_id = test_id # type: Optional[str]
self.deploy_payload = deploy_payload # type: Optional[ProcessDeployment]
self.execute_payload = execute_payload # type: Optional[ProcessExecution]
Expand Down Expand Up @@ -208,7 +210,7 @@ class WorkflowTestRunnerBase(ResourcesUtil, TestCase):
"""
Used between various TestCase runs.
"""
logger_level = logging.INFO # type: int
logger_level = logging.INFO # type: AnyLogLevel
logger_enabled = True # type: bool
logger = None # type: Optional[logging.Logger]
# setting indent to `None` disables pretty-printing of JSON payload
Expand Down Expand Up @@ -820,6 +822,7 @@ def workflow_runner(
log_full_trace, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
): # type: (...) -> ExecutionResults
...

Expand All @@ -831,6 +834,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
): # type: (...) -> DetailedExecutionResults
...
Expand All @@ -842,6 +846,7 @@ def workflow_runner(
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=False, # type: bool
): # type: (...) -> Union[ExecutionResults, DetailedExecutionResults]
"""
Expand All @@ -867,6 +872,9 @@ def workflow_runner(
Function to add further requests mock specifications as needed by the calling test case.
:param override_execute_body:
Alternate execution request content from the default one loaded from the referenced Workflow location.
:param override_execute_path:
Alternate execution request path from the default one employed by the workflow runner.
Must be a supported endpoints (``/jobs``, ``/processes/{pID}/jobs``, ``/processes/{pID}/execution``).
:param detailed_results:
If enabled, each step involved in the :term:`Workflow` chain will provide their respective details
including the :term:`Process` ID, the :term:`Job` UUID, intermediate outputs and logs.
Expand All @@ -879,29 +887,45 @@ def workflow_runner(

# deploy processes and make them visible for workflow
has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids))
path_deploy = "/processes"
for process_id in test_application_ids:
path_visible = f"{path_deploy}/{self.test_processes_info[process_id].id}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers,
json=self.test_processes_info[process_id].deploy_payload,
message="Expect deployed application process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible application process.")
self.prepare_process(process_id, exists_ok=has_duplicate_apps)

# deploy workflow process itself and make visible
workflow_info = self.test_processes_info[test_workflow_id]
self.request("POST", path_deploy, status=HTTPCreated.code, headers=self.headers,
json=workflow_info.deploy_payload,
message="Expect deployed workflow process.")
process_path = f"{path_deploy}/{workflow_info.id}"
visible_path = f"{process_path}/visibility"
visible = {"value": Visibility.PUBLIC}
resp = self.request("PUT", visible_path, json=visible, status=HTTPOk.code, headers=self.headers)
self.assert_test(lambda: resp.json.get("value") == Visibility.PUBLIC,
message="Process should be public.")
workflow_info = self.prepare_process(test_workflow_id)
status_or_results = self.execute_monitor_process(
workflow_info,
detailed_results=detailed_results,
override_execute_body=override_execute_body,
override_execute_path=override_execute_path,
requests_mock_callback=requests_mock_callback,
)
return status_or_results

def prepare_process(self, process_id, exists_ok=False):
# type: (WorkflowProcesses, bool) -> ProcessInfo
"""
Deploys the process referenced by ID using the available :term:`Application Package` and makes it visible.
"""
proc_info = self.test_processes_info[process_id]
body_deploy = proc_info.deploy_payload
path_deploy = "/processes"
path_visible = f"{proc_info.path}/visibility"
data_visible = {"value": Visibility.PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if exists_ok else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers, json=body_deploy,
message="Expect deployed process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible process.")
return proc_info

def execute_monitor_process(
self,
process_info, # type: ProcessInfo
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
override_execute_body=None, # type: Optional[ProcessExecution]
override_execute_path=None, # type: Optional[str]
detailed_results=True, # type: Literal[True]
):
with contextlib.ExitStack() as stack_exec:
for data_source_use in [
"weaver.processes.sources.get_data_source_from_url",
Expand All @@ -918,9 +942,9 @@ def workflow_runner(
requests_mock_callback(mock_req)

# execute workflow
execute_body = override_execute_body or workflow_info.execute_payload
execute_body = override_execute_body or process_info.execute_payload
execute_body.setdefault("mode", ExecuteMode.ASYNC)
execute_path = f"{process_path}/jobs"
execute_path = override_execute_path or f"{process_info.path}/jobs"
self.assert_test(lambda: execute_body is not None,
message="Cannot execute workflow without a request body!")
resp = self.request("POST", execute_path, status=HTTPCreated.code,
Expand Down Expand Up @@ -1603,3 +1627,38 @@ def test_workflow_optional_input_propagation(self):
with open(path, mode="r", encoding="utf-8") as out_file:
data = out_file.read().strip()
assert data == "test-message", "output from workflow should match the default resolved from input omission"

def test_workflow_ad_hoc_nested_process(self):
passthrough_process_info = self.prepare_process(WorkflowProcesses.APP_PASSTHROUGH_EXPRESSIONS)
echo_result_process_info = self.prepare_process(WorkflowProcesses.APP_ECHO_RESULTS_TESTER)

workflow_exec = {
"process": passthrough_process_info.path,
"inputs": {
"message": {
"process": passthrough_process_info.path,
"inputs": {
"process": echo_result_process_info.path,
"inputs": {
"message": "test"
},
"outputs": {"output_data": {}}
},
"outputs": {"message": {}}
},
"code": 456,
}
}
results = self.execute_monitor_process(
passthrough_process_info,
override_execute_body=workflow_exec,
override_execute_path="/jobs",
)
self.assert_test(
lambda: results == {
"message": {"value": "test"},
"code": {"value": 456},
"number": {"value": 3.1416},
"integer": {"value": 3},
}
)
2 changes: 1 addition & 1 deletion tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def retrieve_payload(cls, process, ref_type=None, ref_name=None, ref_found=False
with open(path_ext, mode="r", encoding="utf-8") as f:
json_payload = yaml.safe_load(f) # both JSON/YAML
return json_payload
if urlparse(path_ext).scheme != "":
if urlparse(path_ext).scheme.startswith("http"):
if ref_found:
return path
resp = cls.request("GET", path, force_requests=True, ignore_errors=True)
Expand Down

0 comments on commit d66f07a

Please sign in to comment.