Skip to content

Commit

Permalink
Expression cleanup and optimization. (#174)
Browse files Browse the repository at this point in the history
* Expression cleanup and optimization.
* Remove obsolete draft-2 pluggable expression engine.
* Unify string interpolation for javascript / non-javascript
* Check for simple parameter references and avoid calling out to full javascript
* Remember successful execution of "docker pull node:slim"
tetron authored Aug 26, 2016
1 parent 692fcf9 commit 8a685e6
Showing 3 changed files with 179 additions and 237 deletions.
218 changes: 129 additions & 89 deletions cwltool/expression.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,109 @@
from . import docker
import subprocess
import json
from .utils import aslist, get_feature
import logging
import os
from .errors import WorkflowException
import re

from typing import Any, AnyStr, Union, Text, Dict, List
import schema_salad.validate as validate
import schema_salad.ref_resolver

from .utils import aslist, get_feature
from .errors import WorkflowException
from . import sandboxjs
import re
from typing import Any, AnyStr, Union, Text
from . import docker

_logger = logging.getLogger("cwltool")

def jshead(engineConfig, rootvars):
# type: (List[Text], Dict[Text, Any]) -> Text
return u"\n".join(engineConfig + [u"var %s = %s;" % (k, json.dumps(v, indent=4)) for k, v in rootvars.items()])

def exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image):
# type: (Dict[Text, Any], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Any, bool) -> sandboxjs.JSON

if ex["engine"] == "https://w3id.org/cwl/cwl#JavascriptEngine":
engineConfig = [] # type: List[Text]
for r in reversed(requirements):
if r["class"] == "ExpressionEngineRequirement" and r["id"] == "https://w3id.org/cwl/cwl#JavascriptEngine":
engineConfig = r.get("engineConfig", [])
break
rootvars = {
u"inputs": jobinput,
u"self": context,
u"runtime": {
u"tmpdir": tmpdir,
u"outdir": outdir }
}
return sandboxjs.execjs(ex["script"], jshead(engineConfig, rootvars))

for r in reversed(requirements):
if r["class"] == "ExpressionEngineRequirement" and r["id"] == ex["engine"]:
runtime = [] # type: List[str]

class DR(object):
def __init__(self): # type: () -> None
self.requirements = None # type: List[None]
self.hints = None # type: List[None]
dr = DR()
dr.requirements = r.get("requirements", [])
dr.hints = r.get("hints", [])

(docker_req, docker_is_req) = get_feature(dr, "DockerRequirement")
img_id = None
if docker_req:
img_id = docker.get_from_requirements(docker_req, docker_is_req, pull_image)
if img_id:
runtime = ["docker", "run", "-i", "--rm", str(img_id)]

inp = {
"script": ex["script"],
"engineConfig": r.get("engineConfig", []),
"job": jobinput,
"context": context,
"outdir": outdir,
"tmpdir": tmpdir,
}

_logger.debug(u"Invoking expression engine %s with %s",
runtime + aslist(r["engineCommand"]),
json.dumps(inp, indent=4))

sp = subprocess.Popen(runtime + aslist(r["engineCommand"]),
shell=False,
close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)

(stdoutdata, stderrdata) = sp.communicate(json.dumps(inp) + "\n\n")
if sp.returncode != 0:
raise WorkflowException(u"Expression engine returned non-zero exit code on evaluation of\n%s" % json.dumps(inp, indent=4))

return json.loads(stdoutdata)

raise WorkflowException(u"Unknown expression engine '%s'" % ex["engine"])

seg_symbol = r"""\w+"""
seg_single = r"""\['([^']|\\')+'\]"""
seg_double = r"""\["([^"]|\\")+"\]"""
seg_index = r"""\[[0-9]+\]"""
segments = r"(\.%s|%s|%s|%s)" % (seg_symbol, seg_single, seg_double, seg_index)
segment_re = re.compile(segments, flags=re.UNICODE)
param_re = re.compile(r"\$\((%s)%s*\)" % (seg_symbol, segments), flags=re.UNICODE)
param_re = re.compile(r"\((%s)%s*\)$" % (seg_symbol, segments), flags=re.UNICODE)

JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None]

class SubstitutionError(Exception):
pass

def scanner(scan): # type: (Text) -> List[int]
DEFAULT = 0
DOLLAR = 1
PAREN = 2
BRACE = 3
SINGLE_QUOTE = 4
DOUBLE_QUOTE = 5
BACKSLASH = 6

i = 0
stack = [DEFAULT]
start = 0
while i < len(scan):
state = stack[-1]
c = scan[i]

if state == DEFAULT:
if c == '$':
stack.append(DOLLAR)
elif c == '\\':
stack.append(BACKSLASH)
elif state == BACKSLASH:
stack.pop()
if stack[-1] == DEFAULT:
return [i-1, i+1]
elif state == DOLLAR:
if c == '(':
start = i-1
stack.append(PAREN)
elif c == '{':
start = i-1
stack.append(BRACE)
else:
stack.pop()
elif state == PAREN:
if c == '(':
stack.append(PAREN)
elif c == ')':
stack.pop()
if stack[-1] == DOLLAR:
return [start, i+1]
elif c == "'":
stack.append(SINGLE_QUOTE)
elif c == '"':
stack.append(DOUBLE_QUOTE)
elif state == BRACE:
if c == '{':
stack.append(BRACE)
elif c == '}':
stack.pop()
if stack[-1] == DOLLAR:
return [start, i+1]
elif c == "'":
stack.append(SINGLE_QUOTE)
elif c == '"':
stack.append(DOUBLE_QUOTE)
elif state == SINGLE_QUOTE:
if c == "'":
stack.pop()
elif c == '\\':
stack.append(BACKSLASH)
elif state == DOUBLE_QUOTE:
if c == '"':
stack.pop()
elif c == '\\':
stack.append(BACKSLASH)
i += 1

if len(stack) > 1:
raise SubstitutionError("Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:]))
else:
return None

def next_seg(remain, obj): # type: (Text, Any)->Text
if remain:
@@ -103,24 +119,42 @@ def next_seg(remain, obj): # type: (Text, Any)->Text
else:
return obj


def param_interpolate(ex, obj, strip=True):
# type: (Text, Dict[Any, Any], bool) -> Union[Text, Text]
m = param_re.search(ex)
def evaluator(ex, jslib, obj, fullJS=False, timeout=None):
# type: (Text, Text, Dict[Text, Any], bool, int) -> JSON
m = param_re.match(ex)
if m:
leaf = next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)])
if strip and len(ex.strip()) == len(m.group(0)):
return leaf
else:
leaf = json.dumps(leaf, sort_keys=True)
return next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)])
elif fullJS:
return sandboxjs.execjs(ex, jslib, timeout=timeout)
else:
raise sandboxjs.JavascriptException("Syntax error in parameter reference '%s' or used Javascript code without specifying InlineJavascriptRequirement.", ex)

def interpolate(scan, rootvars,
timeout=None, fullJS=None, jslib=""):
# type: (Text, Dict[Text, Any], int, bool, Union[str, Text]) -> JSON
scan = scan.strip()
parts = []
w = scanner(scan)
while w:
parts.append(scan[0:w[0]])

if scan[w[0]] == '$':
e = evaluator(scan[w[0]+1:w[1]], jslib, rootvars, fullJS=fullJS,
timeout=timeout)
if w[0] == 0 and w[1] == len(scan):
return e
leaf = json.dumps(e, sort_keys=True)
if leaf[0] == '"':
leaf = leaf[1:-1]
return ex[0:m.start(0)] + leaf + param_interpolate(ex[m.end(0):], obj, False)
else:
if "$(" in ex or "${" in ex:
_logger.warn(u"Warning possible workflow bug: found '$(' or '${' in '%s' but did not match valid parameter reference and InlineJavascriptRequirement not specified.", ex)
return ex
parts.append(leaf)
elif scan[w[0]] == '\\':
e = scan[w[1]-1]
parts.append(e)

scan = scan[w[1]:]
w = scanner(scan)
parts.append(scan)
return ''.join(parts)

def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
context=None, pull_image=True, timeout=None):
@@ -135,13 +169,19 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources,
u"self": context,
u"runtime": runtime }

if isinstance(ex, dict) and "engine" in ex and "script" in ex:
return exeval(ex, jobinput, requirements, outdir, tmpdir, context, pull_image)
if isinstance(ex, (str, Text)):
fullJS = False
jslib = u""
for r in reversed(requirements):
if r["class"] == "InlineJavascriptRequirement":
return sandboxjs.interpolate(Text(ex), jshead(
r.get("expressionLib", []), rootvars), timeout=timeout)
return param_interpolate(Text(ex), rootvars)
fullJS = True
jslib = jshead(r.get("expressionLib", []), rootvars)
break

return interpolate(ex,
rootvars,
timeout=timeout,
fullJS=fullJS,
jslib=jslib)
else:
return ex
110 changes: 6 additions & 104 deletions cwltool/sandboxjs.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,9 @@ class JavascriptException(Exception):

JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None]

def execjs(js, jslib, timeout=None): # type: (Union[Mapping,Text], Any, int) -> JSON
have_node_slim = False

def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON
nodejs = None
trynodes = ("nodejs", "node")
for n in trynodes:
@@ -29,10 +31,11 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping,Text], Any, int) ->
if nodejs is None:
try:
nodeimg = "node:slim"
dlist = subprocess.check_output(["docker", "images", nodeimg])
if "node" not in dlist:
global have_node_slim
if not have_node_slim:
nodejsimg = subprocess.check_output(["docker", "pull", nodeimg])
_logger.info("Pulled Docker image %s %s", nodeimg, nodejsimg)
have_node_slim = True
nodejs = subprocess.Popen(["docker", "run",
"--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
"--sig-proxy=true", "--interactive",
@@ -86,104 +89,3 @@ def fn_linenum(): # type: () -> Text
return json.loads(stdoutdata)
except ValueError as e:
raise JavascriptException(u"%s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (e, fn_linenum(), stdoutdata, stderrdata))

class SubstitutionError(Exception):
pass


def scanner(scan): # type: (Text) -> List[int]
DEFAULT = 0
DOLLAR = 1
PAREN = 2
BRACE = 3
SINGLE_QUOTE = 4
DOUBLE_QUOTE = 5
BACKSLASH = 6

i = 0
stack = [DEFAULT]
start = 0
while i < len(scan):
state = stack[-1]
c = scan[i]

if state == DEFAULT:
if c == '$':
stack.append(DOLLAR)
elif c == '\\':
stack.append(BACKSLASH)
elif state == BACKSLASH:
stack.pop()
if stack[-1] == DEFAULT:
return [i-1, i+1]
elif state == DOLLAR:
if c == '(':
start = i-1
stack.append(PAREN)
elif c == '{':
start = i-1
stack.append(BRACE)
elif state == PAREN:
if c == '(':
stack.append(PAREN)
elif c == ')':
stack.pop()
if stack[-1] == DOLLAR:
return [start, i+1]
elif c == "'":
stack.append(SINGLE_QUOTE)
elif c == '"':
stack.append(DOUBLE_QUOTE)
elif state == BRACE:
if c == '{':
stack.append(BRACE)
elif c == '}':
stack.pop()
if stack[-1] == DOLLAR:
return [start, i+1]
elif c == "'":
stack.append(SINGLE_QUOTE)
elif c == '"':
stack.append(DOUBLE_QUOTE)
elif state == SINGLE_QUOTE:
if c == "'":
stack.pop()
elif c == '\\':
stack.append(BACKSLASH)
elif state == DOUBLE_QUOTE:
if c == '"':
stack.pop()
elif c == '\\':
stack.append(BACKSLASH)
i += 1

if len(stack) > 1:
raise SubstitutionError("Substitution error, unfinished block starting at position {}: {}".format(start, scan[start:]))
else:
return None


def interpolate(scan, jslib, timeout=None):
# type: (Text, Union[Text, Text], int) -> JSON
scan = scan.strip()
parts = []
w = scanner(scan)
while w:
parts.append(scan[0:w[0]])

if scan[w[0]] == '$':
e = execjs(scan[w[0]+1:w[1]], jslib, timeout=timeout)
if w[0] == 0 and w[1] == len(scan):
return e
leaf = json.dumps(e, sort_keys=True)
if leaf[0] == '"':
leaf = leaf[1:-1]
parts.append(leaf)
elif scan[w[0]] == '\\':
e = scan[w[1]-1]
parts.append(e)

scan = scan[w[1]:]
w = scanner(scan)
parts.append(scan)
return ''.join(parts)
Loading

0 comments on commit 8a685e6

Please sign in to comment.