diff --git a/SpiffWorkflow/bpmn/parser/BpmnParser.py b/SpiffWorkflow/bpmn/parser/BpmnParser.py
index 056f3bd9..2f4911ed 100644
--- a/SpiffWorkflow/bpmn/parser/BpmnParser.py
+++ b/SpiffWorkflow/bpmn/parser/BpmnParser.py
@@ -92,6 +92,7 @@ def __init__(self, xsd_path=XSD_PATH, imports=None):
self.validator = etree.XMLSchema(schema)
def validate(self, bpmn, filename=None):
+ self.preprocess(bpmn)
try:
self.validator.assertValid(bpmn)
except ValidationException as ve:
@@ -102,6 +103,12 @@ def validate(self, bpmn, filename=None):
raise ValidationException(last_error.message, file_name=filename,
line_number=last_error.line)
+ def preprocess(self, bpmn):
+ # BPMN js creates invalid XML for message correlation properties and it is apparently too difficult to change
+ # therefore, I'll just preprocess the XML and replace the tag in order to continue validating the XML
+ for expr in bpmn.xpath('.//bpmn:correlationPropertyRetrievalExpression/bpmn:formalExpression', namespaces=DEFAULT_NSMAP):
+ expr.tag = '{' + DEFAULT_NSMAP['bpmn'] + '}messagePath'
+
class BpmnParser(object):
"""
The BpmnParser class is a pluggable base class that manages the parsing of
diff --git a/SpiffWorkflow/bpmn/specs/event_definitions/base.py b/SpiffWorkflow/bpmn/specs/event_definitions/base.py
index a2410964..2e4fc912 100644
--- a/SpiffWorkflow/bpmn/specs/event_definitions/base.py
+++ b/SpiffWorkflow/bpmn/specs/event_definitions/base.py
@@ -29,8 +29,12 @@ def throw(self, my_task):
event = BpmnEvent(self)
my_task.workflow.top_workflow.catch(event)
+ def update_task(self, my_task):
+ """This method allows events to implement update behavior for the task"""
+ pass
+
def update_task_data(self, my_task):
- """This method allows events with payloads mrege them into the task"""
+ """This method allows events with payloads to merge them into the task"""
pass
def reset(self, my_task):
diff --git a/SpiffWorkflow/bpmn/specs/event_definitions/message.py b/SpiffWorkflow/bpmn/specs/event_definitions/message.py
index e9f27a65..0d06534e 100644
--- a/SpiffWorkflow/bpmn/specs/event_definitions/message.py
+++ b/SpiffWorkflow/bpmn/specs/event_definitions/message.py
@@ -52,17 +52,29 @@ def update_task_data(self, my_task):
if payload is not None:
my_task.set_data(**payload)
- def get_correlations(self, task, payload):
+ def get_correlations(self, my_task, payload):
+ return self.calculate_correlations(
+ my_task.workflow.script_engine,
+ self.correlation_properties,
+ payload
+ )
+
+ def calculate_correlations(self, script_engine, cp, ctx):
correlations = {}
- for property in self.correlation_properties:
- for key in property.correlation_keys:
+ for prop in cp:
+ value = script_engine.environment.evaluate(prop.retrieval_expression, ctx)
+ for key in prop.correlation_keys:
if key not in correlations:
correlations[key] = {}
try:
- correlations[key][property.name] = task.workflow.script_engine.environment.evaluate(property.retrieval_expression, payload)
+ correlations[key][prop.name] = value
except WorkflowException:
# Just ignore missing keys. The dictionaries have to match exactly
pass
+ if len(prop.correlation_keys) == 0:
+ if self.name not in correlations:
+ correlations[self.name] = {}
+ correlations[self.name][prop.name] = value
return correlations
def details(self, my_task):
diff --git a/SpiffWorkflow/bpmn/specs/event_definitions/timer.py b/SpiffWorkflow/bpmn/specs/event_definitions/timer.py
index 03a5773e..904bc5c4 100644
--- a/SpiffWorkflow/bpmn/specs/event_definitions/timer.py
+++ b/SpiffWorkflow/bpmn/specs/event_definitions/timer.py
@@ -3,6 +3,7 @@
from calendar import monthrange
from time import timezone as tzoffset, altzone as dstoffset, struct_time, localtime
+from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.util import PendingBpmnEvent
from .base import EventDefinition
@@ -200,6 +201,13 @@ def cycle_complete(self, my_task):
my_task._set_internal_data(event_value=event_value)
return ready
+ def update_task(self, my_task):
+ if self.cycle_complete(my_task):
+ for output in my_task.task_spec.outputs:
+ child = my_task._add_child(output, TaskState.FUTURE)
+ child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
+ child.task_spec._update(child)
+
def details(self, my_task):
event_value = my_task._get_internal_data('event_value')
if event_value is not None and event_value['cycles'] != 0:
diff --git a/SpiffWorkflow/bpmn/specs/mixins/events/event_types.py b/SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
index bda35cbd..4c6137b3 100644
--- a/SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
+++ b/SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
@@ -60,13 +60,7 @@ def _update_hook(self, my_task):
elif my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)
- if isinstance(self.event_definition, CycleTimerEventDefinition):
- if self.event_definition.cycle_complete(my_task):
- for output in self.outputs:
- child = my_task._add_child(output, TaskState.FUTURE)
- child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
- child.task_spec._update(child)
-
+ self.event_definition.update_task(my_task)
def _run_hook(self, my_task):
diff --git a/SpiffWorkflow/spiff/parser/event_parsers.py b/SpiffWorkflow/spiff/parser/event_parsers.py
index 32ae6a9f..fda2c889 100644
--- a/SpiffWorkflow/spiff/parser/event_parsers.py
+++ b/SpiffWorkflow/spiff/parser/event_parsers.py
@@ -17,6 +17,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
+import warnings
+
from SpiffWorkflow.bpmn.parser.event_parsers import EventDefinitionParser, ReceiveTaskParser
from SpiffWorkflow.bpmn.parser.event_parsers import (
StartEventParser,
@@ -32,12 +34,30 @@
ErrorEventDefinition,
EscalationEventDefinition,
)
-from SpiffWorkflow.bpmn.parser.util import one
-from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser
+from SpiffWorkflow.bpmn.parser.util import one, first
+from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty
+from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser, SPIFFWORKFLOW_NSMAP
class SpiffEventDefinitionParser(SpiffTaskParser, EventDefinitionParser):
+ def parse_message_extensions(self, node):
+ expression = first(node.xpath('.//spiffworkflow:messagePayload', namespaces=SPIFFWORKFLOW_NSMAP))
+ variable = first(node.xpath('.//spiffworkflow:messageVariable', namespaces=SPIFFWORKFLOW_NSMAP))
+ if expression is not None:
+ expression = expression.text
+ if variable is not None:
+ variable = variable.text
+ return expression, variable
+
+ def parse_process_correlations(self, node):
+ correlations = []
+ for prop in node.xpath('.//spiffworkflow:processVariableCorrelation', namespaces=SPIFFWORKFLOW_NSMAP):
+ key = one(prop.xpath('./spiffworkflow:propertyId', namespaces=SPIFFWORKFLOW_NSMAP))
+ expression = one(prop.xpath('./spiffworkflow:expression', namespaces=SPIFFWORKFLOW_NSMAP))
+ correlations.append(CorrelationProperty(key.text, expression.text, []))
+ return correlations
+
def parse_message_event(self, message_event):
"""Parse a Spiff message event."""
@@ -48,17 +68,23 @@ def parse_message_event(self, message_event):
except Exception:
self.raise_validation_error('Expected a Message node', node=message_event)
name = message.get('name')
- extensions = self.parse_extensions(message)
+ expression, variable = self.parse_message_extensions(message)
+ if expression is not None or variable is not None:
+ warnings.warn(
+ 'spiffworkflow:messagePayload and spiffworkflow:messageVariable have been moved to the bpmn:messageDefinition element',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ else:
+ expression, variable = self.parse_message_extensions(message_event)
correlations = self.get_message_correlations(message_ref)
+ process_correlations = self.parse_process_correlations(message_event)
+ event_def = MessageEventDefinition(name, correlations, expression, variable, process_correlations)
else:
name = message_event.getparent().get('name')
- extensions = {}
- correlations = []
+ event_def = MessageEventDefinition(name)
- return MessageEventDefinition(name, correlations,
- expression=extensions.get('messagePayload'),
- message_var=extensions.get('messageVariable')
- )
+ return event_def
def parse_signal_event(self, signal_event):
"""Parse a Spiff signal event"""
diff --git a/SpiffWorkflow/spiff/parser/task_spec.py b/SpiffWorkflow/spiff/parser/task_spec.py
index 98f215e8..5e2a9346 100644
--- a/SpiffWorkflow/spiff/parser/task_spec.py
+++ b/SpiffWorkflow/spiff/parser/task_spec.py
@@ -30,8 +30,7 @@
BusinessRuleTask
)
-SPIFFWORKFLOW_MODEL_NS = 'http://spiffworkflow.org/bpmn/schema/1.0/core'
-SPIFFWORKFLOW_MODEL_PREFIX = 'spiffworkflow'
+SPIFFWORKFLOW_NSMAP = {'spiffworkflow': 'http://spiffworkflow.org/bpmn/schema/1.0/core'}
class SpiffTaskParser(TaskParser):
@@ -50,9 +49,8 @@ def _parse_extensions(node):
# Too bad doing this works in such a stupid way.
# We should set a namespace and automatically do this.
extensions = {}
- extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
- xpath = xpath_eval(node, extra_ns)
- extension_nodes = xpath(f'./bpmn:extensionElements/{SPIFFWORKFLOW_MODEL_PREFIX}:*')
+ xpath = xpath_eval(node, SPIFFWORKFLOW_NSMAP)
+ extension_nodes = xpath(f'./bpmn:extensionElements/spiffworkflow:*')
for node in extension_nodes:
name = etree.QName(node).localname
if name == 'properties':
@@ -68,7 +66,7 @@ def _parse_extensions(node):
@classmethod
def _node_children_by_tag_name(cls, node, tag_name):
xpath = cls._spiffworkflow_ready_xpath_for_node(node)
- return xpath(f'.//{SPIFFWORKFLOW_MODEL_PREFIX}:{tag_name}')
+ return xpath(f'.//spiffworkflow:{tag_name}')
@classmethod
def _parse_properties(cls, node):
@@ -80,8 +78,7 @@ def _parse_properties(cls, node):
@staticmethod
def _spiffworkflow_ready_xpath_for_node(node):
- extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
- return xpath_eval(node, extra_ns)
+ return xpath_eval(node, SPIFFWORKFLOW_NSMAP)
@classmethod
def _parse_script_unit_tests(cls, node):
@@ -113,7 +110,7 @@ def _parse_servicetask_operator(cls, node):
def _copy_task_attrs(self, original, loop_characteristics):
# I am so disappointed I have to do this.
super()._copy_task_attrs(original)
- if loop_characteristics.attrib.get('{' + SPIFFWORKFLOW_MODEL_NS + '}' + 'scriptsOnInstances') != 'true':
+ if loop_characteristics.xpath('@spiffworkflow:scriptsOnInstances', namespaces=SPIFFWORKFLOW_NSMAP) != ['true']:
self.task.prescript = original.prescript
self.task.postscript = original.postscript
original.prescript = None
diff --git a/SpiffWorkflow/spiff/serializer/event_definition.py b/SpiffWorkflow/spiff/serializer/event_definition.py
index 0e53bc7e..f05ddb42 100644
--- a/SpiffWorkflow/spiff/serializer/event_definition.py
+++ b/SpiffWorkflow/spiff/serializer/event_definition.py
@@ -26,10 +26,12 @@ def to_dict(self, event_definition):
dct['correlation_properties'] = self.correlation_properties_to_dict(event_definition.correlation_properties)
dct['expression'] = event_definition.expression
dct['message_var'] = event_definition.message_var
+ dct['process_correlations'] = self.correlation_properties_to_dict(event_definition.process_correlations)
return dct
def from_dict(self, dct):
dct['correlation_properties'] = self.correlation_properties_from_dict(dct['correlation_properties'])
+ dct['process_correlations'] = self.correlation_properties_from_dict(dct.get('process_correlations', []))
event_definition = super().from_dict(dct)
return event_definition
diff --git a/SpiffWorkflow/spiff/specs/event_definitions.py b/SpiffWorkflow/spiff/specs/event_definitions.py
index 3eb09284..66bebe7d 100644
--- a/SpiffWorkflow/spiff/specs/event_definitions.py
+++ b/SpiffWorkflow/spiff/specs/event_definitions.py
@@ -28,10 +28,11 @@
class MessageEventDefinition(MessageEventDefinition):
- def __init__(self, name, correlation_properties=None, expression=None, message_var=None, **kwargs):
+ def __init__(self, name, correlation_properties=None, expression=None, message_var=None, process_correlations=None, **kwargs):
super(MessageEventDefinition, self).__init__(name, correlation_properties, **kwargs)
self.expression = expression
self.message_var = message_var
+ self.process_correlations = process_correlations or []
def throw(self, my_task):
payload = my_task.workflow.script_engine.evaluate(my_task, self.expression)
@@ -40,6 +41,14 @@ def throw(self, my_task):
my_task.workflow.correlations.update(correlations)
my_task.workflow.top_workflow.catch(event)
+ def update_task(self, my_task):
+ correlations = self.calculate_correlations(
+ my_task.workflow.script_engine,
+ self.process_correlations,
+ my_task.data
+ )
+ my_task.workflow.correlations.update(correlations)
+
def update_task_data(self, my_task):
if self.message_var is not None:
my_task.data[self.message_var] = my_task.internal_data.pop(self.name)
diff --git a/tests/SpiffWorkflow/spiff/BaseTestCase.py b/tests/SpiffWorkflow/spiff/BaseTestCase.py
index 81a66112..b7ff689a 100644
--- a/tests/SpiffWorkflow/spiff/BaseTestCase.py
+++ b/tests/SpiffWorkflow/spiff/BaseTestCase.py
@@ -26,12 +26,12 @@ def load_workflow_spec(self, filename, process_name, dmn_filename=None, validate
def load_collaboration(self, filename, collaboration_name):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
- parser = SpiffBpmnParser()
+ parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.get_collaboration(collaboration_name)
def get_all_specs(self, filename):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
- parser = SpiffBpmnParser()
+ parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.find_all_specs()
diff --git a/tests/SpiffWorkflow/spiff/CorrelationTest.py b/tests/SpiffWorkflow/spiff/CorrelationTest.py
index 07b753b7..906613b7 100644
--- a/tests/SpiffWorkflow/spiff/CorrelationTest.py
+++ b/tests/SpiffWorkflow/spiff/CorrelationTest.py
@@ -1,4 +1,5 @@
-from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
+from SpiffWorkflow.bpmn import BpmnWorkflow, BpmnEvent
+from SpiffWorkflow import TaskState
from .BaseTestCase import BaseTestCase
@@ -53,3 +54,36 @@ def testTwoCorrelatonKeys(self):
self.assertNotIn('message_correlation_key_one', messages[1].correlations)
self.assertIn('message_correlation_key_two', messages[1].correlations)
self.assertNotIn('message_correlation_key_two', messages[0].correlations)
+
+
+class ReceiveCorrelationTest(BaseTestCase):
+
+ def testReceiveCorrelations(self):
+ self.actual_test()
+
+ def testReceiveCorrelationsSaveRestore(self):
+ self.actual_test(True)
+
+ def actual_test(self, save_restore=False):
+ spec, subprocesses = self.load_workflow_spec('receive_correlations.bpmn', 'correlation-test')
+ self.workflow = BpmnWorkflow(spec, subprocesses)
+ if save_restore:
+ self.save_restore()
+ self.workflow.do_engine_steps()
+ task = self.workflow.get_next_task(state=TaskState.READY)
+ task.data.update(value_1='a', value_2='b')
+ task.run()
+ self.workflow.do_engine_steps()
+ self.assertEqual(self.workflow.correlations, {'message': {'prop_1': 'a', 'prop_2': 'b'}})
+ waiting_task = self.workflow.get_next_task(state=TaskState.WAITING)
+ event_def = waiting_task.task_spec.event_definition
+ payload = {'msg_value_1': 'a', 'msg_value_2': 'b'}
+ correlations = event_def.calculate_correlations(
+ waiting_task.workflow.script_engine,
+ event_def.correlation_properties,
+ payload
+ )
+ event = BpmnEvent(event_def, payload, correlations)
+ self.workflow.catch(event)
+ self.workflow.do_engine_steps()
+ self.assertTrue(self.workflow.is_completed)
diff --git a/tests/SpiffWorkflow/spiff/data/correlation.bpmn b/tests/SpiffWorkflow/spiff/data/correlation.bpmn
index e364fb1d..eef05017 100644
--- a/tests/SpiffWorkflow/spiff/data/correlation.bpmn
+++ b/tests/SpiffWorkflow/spiff/data/correlation.bpmn
@@ -15,18 +15,8 @@
init_id
-
-
- {'num': task_num, 'name': task_name}
- source_task
-
-
-
-
- {'init_id': source_task['num'], 'response': response}
- response
-
-
+
+
@@ -53,11 +43,17 @@
+
+ {'num': task_num, 'name': task_name}
+
Flow_02xt17l
Flow_0ts36fv
+
+ response
+
Flow_0ts36fv
Flow_17cd3h6
@@ -69,7 +65,11 @@
Flow_0qafvbe
-
+
+
+ source_task
+
+
@@ -81,7 +81,11 @@
Flow_12j0ayf
Flow_0k7rc31
-
+
+
+ {'init_id': source_task['num'], 'response': response}
+
+
Flow_0k7rc31
diff --git a/tests/SpiffWorkflow/spiff/data/correlation_two_conversations.bpmn b/tests/SpiffWorkflow/spiff/data/correlation_two_conversations.bpmn
index b42f5047..6242d103 100644
--- a/tests/SpiffWorkflow/spiff/data/correlation_two_conversations.bpmn
+++ b/tests/SpiffWorkflow/spiff/data/correlation_two_conversations.bpmn
@@ -46,9 +46,16 @@
Flow_01u8qkn
Flow_0sxqx67
-
+
+
+ payload_var_one
+
+
+
+ payload_var_two
+
Flow_1yt3owq
Flow_01u8qkn
@@ -67,6 +74,11 @@ del time
the_topic = "first_conversation"
+ {
+"topica_one": topic_one_a,
+"topicb_one": topic_one_b,
+"initial_var_one": 3
+}
Flow_1ihr88m
Flow_0n4m9ti
@@ -75,7 +87,15 @@ del time
Flow_0q3clix
Flow_1yt3owq
-
+
+
+ {
+"topica_two": topic_two_a,
+"topicb_two": topic_two_b,
+"initial_var_two": 5
+}
+
+
Flow_0n4m9ti
@@ -87,34 +107,10 @@ topic_two_b = f"topic_two_b_conversation_{timestamp}"
del time
-
-
- {
-"topica_one": topic_one_a,
-"topicb_one": topic_one_b,
-"initial_var_one": 3
-}
-
-
-
-
- payload_var_one
-
-
-
-
- payload_var_two
-
-
-
-
- {
-"topica_two": topic_two_a,
-"topicb_two": topic_two_b,
-"initial_var_two": 5
-}
-
-
+
+
+
+
topica_two
diff --git a/tests/SpiffWorkflow/spiff/data/receive_correlations.bpmn b/tests/SpiffWorkflow/spiff/data/receive_correlations.bpmn
new file mode 100644
index 00000000..165b51d6
--- /dev/null
+++ b/tests/SpiffWorkflow/spiff/data/receive_correlations.bpmn
@@ -0,0 +1,75 @@
+
+
+
+
+ Flow_032rj36
+
+
+ Flow_032rj36
+ Flow_0v2d6wi
+
+
+
+ Flow_1b6bjtj
+
+
+
+
+ Flow_0v2d6wi
+ Flow_1b6bjtj
+
+
+ response
+
+ prop_1
+ value_1
+
+
+ prop_2
+ value_2
+
+
+
+
+
+
+
+
+ msg_value_1
+
+
+
+
+ msg_value_2
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+