Skip to content

Commit

Permalink
Doc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-c committed Jun 26, 2024
1 parent b4ee4c8 commit 0ff2d65
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ remove_unused_imports: $(PYSOURCES)

pep257: pydocstyle
## pydocstyle : check Python docstring style
pydocstyle: $(PYSOURCES)
pydocstyle: $(PYSOURCES) FORCE
pydocstyle --add-ignore=D100,D101,D102,D103 $^ || true

pydocstyle_report.txt: $(PYSOURCES)
pydocstyle_report.txt: $(PYSOURCES) FORCE
pydocstyle $^ > $@ 2>&1 || true

## diff_pydocstyle_report : check Python docstring style for changed files only
Expand Down
12 changes: 12 additions & 0 deletions cwl_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

class Job(threading.Thread):
def __init__(self, jobid: int, path: str, inputobj: bytes) -> None:
"""Initialize the execution Job."""
super().__init__()
self.jobid = jobid
self.path = path
Expand All @@ -28,6 +29,7 @@ def __init__(self, jobid: int, path: str, inputobj: bytes) -> None:
self.begin()

def begin(self) -> None:
"""Star executing using cwl-runner."""
loghandle, self.logname = tempfile.mkstemp()
with self.updatelock:
self.outdir = tempfile.mkdtemp()
Expand All @@ -49,6 +51,7 @@ def begin(self) -> None:
}

def run(self) -> None:
"""Wait for execution to finish and report the result."""
self.stdoutdata, self.stderrdata = self.proc.communicate(self.inputobj)
if self.proc.returncode == 0:
outobj = yaml.load(self.stdoutdata, Loader=yaml.FullLoader)
Expand All @@ -60,22 +63,26 @@ def run(self) -> None:
self.status["state"] = "Failed"

def getstatus(self) -> Dict[str, Any]:
"""Report the current status."""
with self.updatelock:
return self.status.copy()

def cancel(self) -> None:
"""Cancel the excution thread, if any."""
if self.status["state"] == "Running":
self.proc.send_signal(signal.SIGQUIT)
with self.updatelock:
self.status["state"] = "Canceled"

def pause(self) -> None:
"""Pause the execution thread, if any."""
if self.status["state"] == "Running":
self.proc.send_signal(signal.SIGTSTP)
with self.updatelock:
self.status["state"] = "Paused"

def resume(self) -> None:
"""If paused, then resume the execution thread."""
if self.status["state"] == "Paused":
self.proc.send_signal(signal.SIGCONT)
with self.updatelock:
Expand All @@ -84,6 +91,7 @@ def resume(self) -> None:

@app.route("/run", methods=["POST"])
def runworkflow() -> werkzeug.wrappers.response.Response:
"""Accept a workflow exection request and run it."""
path = request.args["wf"]
with jobs_lock:
jobid = len(jobs)
Expand All @@ -95,6 +103,7 @@ def runworkflow() -> werkzeug.wrappers.response.Response:

@app.route("/jobs/<int:jobid>", methods=["GET", "POST"])
def jobcontrol(jobid: int) -> Tuple[str, int]:
"""Accept a job related action and report the result."""
with jobs_lock:
job = jobs[jobid]
if request.method == "POST":
Expand All @@ -112,6 +121,7 @@ def jobcontrol(jobid: int) -> Tuple[str, int]:


def logspooler(job: Job) -> Generator[str, None, None]:
"""Yield 4 kilobytes of log text at a time."""
with open(job.logname) as f:
while True:
r = f.read(4096)
Expand All @@ -126,13 +136,15 @@ def logspooler(job: Job) -> Generator[str, None, None]:

@app.route("/jobs/<int:jobid>/log", methods=["GET"])
def getlog(jobid: int) -> Response:
"""Dump the log."""
with jobs_lock:
job = jobs[jobid]
return Response(logspooler(job))


@app.route("/jobs", methods=["GET"])
def getjobs() -> Response:
"""Report all known jobs."""
with jobs_lock:
jobscopy = copy.copy(jobs)

Expand Down
1 change: 1 addition & 0 deletions cwltool_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


def main(args: List[str] = sys.argv[1:]) -> int:
"""Streaming execution of cwltool."""
if len(args) == 0:
print("Workflow must be on command line")
return 1
Expand Down
30 changes: 19 additions & 11 deletions wes_client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@


def py3_compatible(filePath: str) -> bool:
"""Determines if a python file is 3.x compatible by seeing if it compiles in a subprocess"""
"""
Check file for Python 3.x compatibity.
(By seeing if it compiles in a subprocess)
"""
try:
check_call(
[sys.executable, "-m", "py_compile", os.path.normpath(filePath)],
Expand All @@ -27,7 +31,7 @@ def py3_compatible(filePath: str) -> bool:


def get_version(extension: str, workflow_file: str) -> str:
"""Determines the version of a .py, .wdl, or .cwl file."""
"""Determine the version of a .py, .wdl, or .cwl file."""
if extension == "py" and py3_compatible(workflow_file):
return "3"
elif extension == "cwl":
Expand All @@ -47,14 +51,13 @@ def get_version(extension: str, workflow_file: str) -> str:

def wf_info(workflow_path: str) -> Tuple[str, str]:
"""
Returns the version of the file and the file extension.
Return the version of the file and the file extension.
Assumes that the file path is to the file directly ie, ends with a valid
file extension. Supports checking local files as well as files at http://
and https:// locations. Files at these remote locations are recreated locally to
enable our approach to version checking, then removed after version is extracted.
"""

supported_formats = ["py", "wdl", "cwl"]
file_type = workflow_path.lower().split(".")[-1] # Grab the file extension
workflow_path = workflow_path if ":" in workflow_path else "file://" + workflow_path
Expand Down Expand Up @@ -183,6 +186,7 @@ def build_wes_request(


def expand_globs(attachments: Optional[Union[List[str], str]]) -> Set[str]:
"""Expand any globs present in the attachment list."""
expanded_list = []
if attachments is None:
attachments = []
Expand All @@ -198,7 +202,8 @@ def expand_globs(attachments: Optional[Union[List[str], str]]) -> Set[str]:
return set(expanded_list)


def wes_reponse(postresult: requests.Response) -> Dict[str, Any]:
def wes_response(postresult: requests.Response) -> Dict[str, Any]:
"""Convert a Response object to JSON text."""
if postresult.status_code != 200:
error = str(json.loads(postresult.text))
logging.error(error)
Expand All @@ -208,7 +213,10 @@ def wes_reponse(postresult: requests.Response) -> Dict[str, Any]:


class WESClient:
"""WES client."""

def __init__(self, service: Dict[str, Any]):
"""Initialize the cliet with the provided credentials and endpoint."""
self.auth = service["auth"]
self.proto = service["proto"]
self.host = service["host"]
Expand All @@ -230,7 +238,7 @@ def get_service_info(self) -> Dict[str, Any]:
f"{self.proto}://{self.host}/ga4gh/wes/v1/service-info",
headers=self.auth,
)
return wes_reponse(postresult)
return wes_response(postresult)

def list_runs(self) -> Dict[str, Any]:
"""
Expand All @@ -247,7 +255,7 @@ def list_runs(self) -> Dict[str, Any]:
postresult = requests.get( # nosec B113
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs", headers=self.auth
)
return wes_reponse(postresult)
return wes_response(postresult)

def run(
self, wf: str, jsonyaml: str, attachments: Optional[List[str]]
Expand All @@ -271,7 +279,7 @@ def run(
files=parts,
headers=self.auth,
)
return wes_reponse(postresult)
return wes_response(postresult)

def cancel(self, run_id: str) -> Dict[str, Any]:
"""
Expand All @@ -287,7 +295,7 @@ def cancel(self, run_id: str) -> Dict[str, Any]:
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs/{run_id}/cancel",
headers=self.auth,
)
return wes_reponse(postresult)
return wes_response(postresult)

def get_run_log(self, run_id: str) -> Dict[str, Any]:
"""
Expand All @@ -303,7 +311,7 @@ def get_run_log(self, run_id: str) -> Dict[str, Any]:
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs/{run_id}",
headers=self.auth,
)
return wes_reponse(postresult)
return wes_response(postresult)

def get_run_status(self, run_id: str) -> Dict[str, Any]:
"""
Expand All @@ -319,4 +327,4 @@ def get_run_status(self, run_id: str) -> Dict[str, Any]:
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs/{run_id}/status",
headers=self.auth,
)
return wes_reponse(postresult)
return wes_response(postresult)
1 change: 1 addition & 0 deletions wes_client/wes_client_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


def main(argv: List[str] = sys.argv[1:]) -> int:
"""Run the WES service."""
parser = argparse.ArgumentParser(description="Workflow Execution Service")
parser.add_argument(
"--host",
Expand Down
17 changes: 16 additions & 1 deletion wes_service/arvados_wes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Arvados backed for the WES service."""

import arvados # type: ignore[import-untyped]
import arvados.util # type: ignore[import-untyped]
import arvados.collection # type: ignore[import-untyped]
Expand All @@ -20,7 +22,8 @@ class MissingAuthorization(Exception):
pass


def get_api(authtoken: Optional[str] = None) -> Any:
def get_api(authtoken: Optional[str] = None) -> arvados.api.api:
"""Retrieve an Arvados API object."""
if authtoken is None:
if not connexion.request.headers.get("Authorization"):
raise MissingAuthorization()
Expand Down Expand Up @@ -81,7 +84,10 @@ def catch_exceptions_wrapper(self: Any, *args: str, **kwargs: str) -> Any:


class ArvadosBackend(WESBackend):
"""Arvados backend for the WES Service."""

def GetServiceInfo(self) -> Dict[str, Any]:
"""Report metadata about this WES endpoint."""
stdout, stderr = subprocess.Popen( # nosec B603
[shutil.which("arvados-cwl-runner") or "arvados-cwl-runner", "--version"],
stderr=subprocess.PIPE,
Expand All @@ -106,6 +112,7 @@ def ListRuns(
page_token: Optional[str] = None,
state_search: Any = None,
) -> Dict[str, Any]:
"""List the known workflow runs."""
api = get_api()

paging = []
Expand Down Expand Up @@ -150,6 +157,7 @@ def ListRuns(
def log_for_run(
self, run_id: Optional[str], message: str, authtoken: Optional[str] = None
) -> None:
"""Report the log for a given run."""
get_api(authtoken).logs().create(
body={
"log": {
Expand All @@ -169,6 +177,7 @@ def invoke_cwl_runner(
project_uuid: str,
tempdir: str,
) -> None:
"""Submit the workflow using `arvados-cwl-runner`."""
api = arvados.api_from_config(
version="v1",
apiconfig={
Expand Down Expand Up @@ -252,6 +261,7 @@ def invoke_cwl_runner(
def RunWorkflow(
self, **args: str
) -> Union[Tuple[Dict[str, Any], int], Dict[str, Any]]:
"""Submit the workflow run request."""
if not connexion.request.headers.get("Authorization"):
raise MissingAuthorization()

Expand Down Expand Up @@ -348,6 +358,7 @@ def RunWorkflow(

@catch_exceptions
def GetRunLog(self, run_id: str) -> Dict[str, str]:
"""Get the log for a particular workflow run."""
api = get_api()

request = api.container_requests().get(uuid=run_id).execute()
Expand Down Expand Up @@ -449,6 +460,7 @@ def log_object(cr: Dict[str, Any]) -> Dict[str, Any]:

@catch_exceptions
def CancelRun(self, run_id: str) -> Dict[str, Any]: # NOQA
"""Cancel a submitted run."""
api = get_api()
request = (
api.container_requests().update(uuid=run_id, body={"priority": 0}).execute()
Expand All @@ -457,6 +469,7 @@ def CancelRun(self, run_id: str) -> Dict[str, Any]: # NOQA

@catch_exceptions
def GetRunStatus(self, run_id: str) -> Dict[str, Any]:
"""Determine the status for a given run."""
api = get_api()
request = api.container_requests().get(uuid=run_id).execute()
if request["container_uuid"]:
Expand All @@ -471,6 +484,7 @@ def GetRunStatus(self, run_id: str) -> Dict[str, Any]:


def dynamic_logs(run_id: str, logstream: str) -> str:
"""Retrienve logs, chasing down the container logs as well."""
api = get_api()
cr = api.container_requests().get(uuid=run_id).execute()
l1 = [
Expand Down Expand Up @@ -503,6 +517,7 @@ def dynamic_logs(run_id: str, logstream: str) -> str:


def create_backend(app: Any, opts: List[str]) -> ArvadosBackend:
"""Instantiate an ArvadosBackend."""
ab = ArvadosBackend(opts)
app.app.route("/ga4gh/wes/v1/runs/<run_id>/x-dynamic-logs/<logstream>")(
dynamic_logs
Expand Down
Loading

0 comments on commit 0ff2d65

Please sign in to comment.