Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive Cancelled Workflows #939

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,17 +643,19 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id),
# Check if the workflow is archived
wf_status = get_wf_status(wf_id)
if wf_status == 'Archived':
copy_dag_in_archive = False
bee_workdir = wf_utils.get_bee_workdir()
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
graphmls_dir = mount_dir + '/graphmls'
typer.secho("Workflow has been archived. All new DAGs will look the same as the one "
"in the archive directory.",
fg=typer.colors.MAGENTA)
else:
copy_dag_in_archive = True
wf_dir = wf_utils.get_workflow_dir(wf_id)
graphmls_dir = wf_dir + '/graphmls'
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)

Expand Down
41 changes: 31 additions & 10 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import networkx as nx
import graphviz

from beeflow.wf_manager.resources import wf_utils

def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):

def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
Expand All @@ -18,13 +20,7 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):
os.makedirs(dags_dir, exist_ok=True)

output_path = dags_dir + "/" + short_id + ".png"
if os.path.exists(output_path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(output_path, backup_path)
backup_dag(output_path, dags_dir, short_id)

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)
Expand All @@ -38,8 +34,27 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path, "wb") as png_file:
png_file.write(png_data)
save_png(output_path, png_data)

if copy_dag_in_archive:
# Save and backup DAGs in the workflow_dir which will be archived
workflow_dir = wf_utils.get_workflow_dir(wf_id)
archive_dag_dir = workflow_dir + "/dags"
os.makedirs(archive_dag_dir, exist_ok=True)
archive_dag_path = archive_dag_dir + "/" + short_id + ".png"
backup_dag(archive_dag_path, archive_dag_dir, short_id)
save_png(archive_dag_path, png_data)


def backup_dag(path, dags_dir, short_id):
"""Backup DAGs."""
if os.path.exists(path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(path, backup_path)


def add_nodes_to_dot(graph, dot):
Expand Down Expand Up @@ -91,3 +106,9 @@ def add_edges_to_dot(graph, dot):
fontsize="10", fontname="times-bold")
else:
dot.edge(target, source, label=edge_label, fontsize="10")


def save_png(output_path, png_data):
"""Save png data."""
with open(output_path, "wb") as png_file:
png_file.write(png_data)
4 changes: 4 additions & 0 deletions beeflow/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def execute_workflow(self):
"""Fake executing a workflow."""
pass # noqa

def export_graphml(self):
"""Fake exporting a graphml of a workflow."""
pass # noqa


class MockGDBDriver:
"""A mock GDB driver.
Expand Down
1 change: 1 addition & 0 deletions beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.archive_workflow', return_value=None)

wf_name = 'wf'
workdir = 'dir'
Expand Down
3 changes: 3 additions & 0 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from flask_restful import Resource, reqparse
from beeflow.common import log as bee_logging
from beeflow.wf_manager.resources import wf_utils
from beeflow.wf_manager.resources.wf_update import archive_workflow

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
Expand Down Expand Up @@ -64,6 +65,8 @@ def delete(self, wf_id):
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
# Archive cancelled workflow
archive_workflow(db, wf_id, final_state='Cancelled')
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
Expand Down
4 changes: 3 additions & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def archive_workflow(db, wf_id, final_state=None):
# Archive Completed DAG
graphmls_dir = workflow_dir + "/graphmls"
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True)
dags_dir = workflow_dir + "/dags"
os.makedirs(dags_dir, exist_ok=True)
wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
Expand Down
4 changes: 2 additions & 2 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir):
def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id, graphmls_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, copy_dag_in_archive)


def start_workflow(wf_id):
Expand Down