Skip to content

Commit

Permalink
Merge pull request #401 from sartography/feature/receive-message-corr…
Browse files Browse the repository at this point in the history
…elations

Feature/receive message correlations
  • Loading branch information
essweine authored Apr 25, 2024
2 parents 76eb9e6 + 2a2cbb8 commit 2a6a705
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 78 deletions.
7 changes: 7 additions & 0 deletions SpiffWorkflow/bpmn/parser/BpmnParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self, xsd_path=XSD_PATH, imports=None):
self.validator = etree.XMLSchema(schema)

def validate(self, bpmn, filename=None):
self.preprocess(bpmn)
try:
self.validator.assertValid(bpmn)
except ValidationException as ve:
Expand All @@ -102,6 +103,12 @@ def validate(self, bpmn, filename=None):
raise ValidationException(last_error.message, file_name=filename,
line_number=last_error.line)

def preprocess(self, bpmn):
# BPMN js creates invalid XML for message correlation properties and it is apparently too difficult to change
# therefore, I'll just preprocess the XML and replace the tag in order to continue validating the XML
for expr in bpmn.xpath('.//bpmn:correlationPropertyRetrievalExpression/bpmn:formalExpression', namespaces=DEFAULT_NSMAP):
expr.tag = '{' + DEFAULT_NSMAP['bpmn'] + '}messagePath'

class BpmnParser(object):
"""
The BpmnParser class is a pluggable base class that manages the parsing of
Expand Down
6 changes: 5 additions & 1 deletion SpiffWorkflow/bpmn/specs/event_definitions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ def throw(self, my_task):
event = BpmnEvent(self)
my_task.workflow.top_workflow.catch(event)

def update_task(self, my_task):
"""This method allows events to implement update behavior for the task"""
pass

def update_task_data(self, my_task):
"""This method allows events with payloads mrege them into the task"""
"""This method allows events with payloads to merge them into the task"""
pass

def reset(self, my_task):
Expand Down
20 changes: 16 additions & 4 deletions SpiffWorkflow/bpmn/specs/event_definitions/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,29 @@ def update_task_data(self, my_task):
if payload is not None:
my_task.set_data(**payload)

def get_correlations(self, task, payload):
def get_correlations(self, my_task, payload):
return self.calculate_correlations(
my_task.workflow.script_engine,
self.correlation_properties,
payload
)

def calculate_correlations(self, script_engine, cp, ctx):
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
for prop in cp:
value = script_engine.environment.evaluate(prop.retrieval_expression, ctx)
for key in prop.correlation_keys:
if key not in correlations:
correlations[key] = {}
try:
correlations[key][property.name] = task.workflow.script_engine.environment.evaluate(property.retrieval_expression, payload)
correlations[key][prop.name] = value
except WorkflowException:
# Just ignore missing keys. The dictionaries have to match exactly
pass
if len(prop.correlation_keys) == 0:
if self.name not in correlations:
correlations[self.name] = {}
correlations[self.name][prop.name] = value
return correlations

def details(self, my_task):
Expand Down
8 changes: 8 additions & 0 deletions SpiffWorkflow/bpmn/specs/event_definitions/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from calendar import monthrange
from time import timezone as tzoffset, altzone as dstoffset, struct_time, localtime

from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.util import PendingBpmnEvent
from .base import EventDefinition

Expand Down Expand Up @@ -200,6 +201,13 @@ def cycle_complete(self, my_task):
my_task._set_internal_data(event_value=event_value)
return ready

def update_task(self, my_task):
if self.cycle_complete(my_task):
for output in my_task.task_spec.outputs:
child = my_task._add_child(output, TaskState.FUTURE)
child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
child.task_spec._update(child)

def details(self, my_task):
event_value = my_task._get_internal_data('event_value')
if event_value is not None and event_value['cycles'] != 0:
Expand Down
8 changes: 1 addition & 7 deletions SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ def _update_hook(self, my_task):
elif my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)

if isinstance(self.event_definition, CycleTimerEventDefinition):
if self.event_definition.cycle_complete(my_task):
for output in self.outputs:
child = my_task._add_child(output, TaskState.FUTURE)
child.task_spec._predict(child, mask=TaskState.NOT_FINISHED_MASK)
child.task_spec._update(child)

self.event_definition.update_task(my_task)

def _run_hook(self, my_task):

Expand Down
44 changes: 35 additions & 9 deletions SpiffWorkflow/spiff/parser/event_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

import warnings

from SpiffWorkflow.bpmn.parser.event_parsers import EventDefinitionParser, ReceiveTaskParser
from SpiffWorkflow.bpmn.parser.event_parsers import (
StartEventParser,
Expand All @@ -32,12 +34,30 @@
ErrorEventDefinition,
EscalationEventDefinition,
)
from SpiffWorkflow.bpmn.parser.util import one
from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser
from SpiffWorkflow.bpmn.parser.util import one, first
from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty
from SpiffWorkflow.spiff.parser.task_spec import SpiffTaskParser, SPIFFWORKFLOW_NSMAP


class SpiffEventDefinitionParser(SpiffTaskParser, EventDefinitionParser):

def parse_message_extensions(self, node):
expression = first(node.xpath('.//spiffworkflow:messagePayload', namespaces=SPIFFWORKFLOW_NSMAP))
variable = first(node.xpath('.//spiffworkflow:messageVariable', namespaces=SPIFFWORKFLOW_NSMAP))
if expression is not None:
expression = expression.text
if variable is not None:
variable = variable.text
return expression, variable

def parse_process_correlations(self, node):
correlations = []
for prop in node.xpath('.//spiffworkflow:processVariableCorrelation', namespaces=SPIFFWORKFLOW_NSMAP):
key = one(prop.xpath('./spiffworkflow:propertyId', namespaces=SPIFFWORKFLOW_NSMAP))
expression = one(prop.xpath('./spiffworkflow:expression', namespaces=SPIFFWORKFLOW_NSMAP))
correlations.append(CorrelationProperty(key.text, expression.text, []))
return correlations

def parse_message_event(self, message_event):
"""Parse a Spiff message event."""

Expand All @@ -48,17 +68,23 @@ def parse_message_event(self, message_event):
except Exception:
self.raise_validation_error('Expected a Message node', node=message_event)
name = message.get('name')
extensions = self.parse_extensions(message)
expression, variable = self.parse_message_extensions(message)
if expression is not None or variable is not None:
warnings.warn(
'spiffworkflow:messagePayload and spiffworkflow:messageVariable have been moved to the bpmn:messageDefinition element',
DeprecationWarning,
stacklevel=2,
)
else:
expression, variable = self.parse_message_extensions(message_event)
correlations = self.get_message_correlations(message_ref)
process_correlations = self.parse_process_correlations(message_event)
event_def = MessageEventDefinition(name, correlations, expression, variable, process_correlations)
else:
name = message_event.getparent().get('name')
extensions = {}
correlations = []
event_def = MessageEventDefinition(name)

return MessageEventDefinition(name, correlations,
expression=extensions.get('messagePayload'),
message_var=extensions.get('messageVariable')
)
return event_def

def parse_signal_event(self, signal_event):
"""Parse a Spiff signal event"""
Expand Down
15 changes: 6 additions & 9 deletions SpiffWorkflow/spiff/parser/task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
BusinessRuleTask
)

SPIFFWORKFLOW_MODEL_NS = 'http://spiffworkflow.org/bpmn/schema/1.0/core'
SPIFFWORKFLOW_MODEL_PREFIX = 'spiffworkflow'
SPIFFWORKFLOW_NSMAP = {'spiffworkflow': 'http://spiffworkflow.org/bpmn/schema/1.0/core'}


class SpiffTaskParser(TaskParser):
Expand All @@ -50,9 +49,8 @@ def _parse_extensions(node):
# Too bad doing this works in such a stupid way.
# We should set a namespace and automatically do this.
extensions = {}
extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
xpath = xpath_eval(node, extra_ns)
extension_nodes = xpath(f'./bpmn:extensionElements/{SPIFFWORKFLOW_MODEL_PREFIX}:*')
xpath = xpath_eval(node, SPIFFWORKFLOW_NSMAP)
extension_nodes = xpath(f'./bpmn:extensionElements/spiffworkflow:*')
for node in extension_nodes:
name = etree.QName(node).localname
if name == 'properties':
Expand All @@ -68,7 +66,7 @@ def _parse_extensions(node):
@classmethod
def _node_children_by_tag_name(cls, node, tag_name):
xpath = cls._spiffworkflow_ready_xpath_for_node(node)
return xpath(f'.//{SPIFFWORKFLOW_MODEL_PREFIX}:{tag_name}')
return xpath(f'.//spiffworkflow:{tag_name}')

@classmethod
def _parse_properties(cls, node):
Expand All @@ -80,8 +78,7 @@ def _parse_properties(cls, node):

@staticmethod
def _spiffworkflow_ready_xpath_for_node(node):
extra_ns = {SPIFFWORKFLOW_MODEL_PREFIX: SPIFFWORKFLOW_MODEL_NS}
return xpath_eval(node, extra_ns)
return xpath_eval(node, SPIFFWORKFLOW_NSMAP)

@classmethod
def _parse_script_unit_tests(cls, node):
Expand Down Expand Up @@ -113,7 +110,7 @@ def _parse_servicetask_operator(cls, node):
def _copy_task_attrs(self, original, loop_characteristics):
# I am so disappointed I have to do this.
super()._copy_task_attrs(original)
if loop_characteristics.attrib.get('{' + SPIFFWORKFLOW_MODEL_NS + '}' + 'scriptsOnInstances') != 'true':
if loop_characteristics.xpath('@spiffworkflow:scriptsOnInstances', namespaces=SPIFFWORKFLOW_NSMAP) != ['true']:
self.task.prescript = original.prescript
self.task.postscript = original.postscript
original.prescript = None
Expand Down
2 changes: 2 additions & 0 deletions SpiffWorkflow/spiff/serializer/event_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ def to_dict(self, event_definition):
dct['correlation_properties'] = self.correlation_properties_to_dict(event_definition.correlation_properties)
dct['expression'] = event_definition.expression
dct['message_var'] = event_definition.message_var
dct['process_correlations'] = self.correlation_properties_to_dict(event_definition.process_correlations)
return dct

def from_dict(self, dct):
dct['correlation_properties'] = self.correlation_properties_from_dict(dct['correlation_properties'])
dct['process_correlations'] = self.correlation_properties_from_dict(dct.get('process_correlations', []))
event_definition = super().from_dict(dct)
return event_definition

Expand Down
11 changes: 10 additions & 1 deletion SpiffWorkflow/spiff/specs/event_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@

class MessageEventDefinition(MessageEventDefinition):

def __init__(self, name, correlation_properties=None, expression=None, message_var=None, **kwargs):
def __init__(self, name, correlation_properties=None, expression=None, message_var=None, process_correlations=None, **kwargs):
super(MessageEventDefinition, self).__init__(name, correlation_properties, **kwargs)
self.expression = expression
self.message_var = message_var
self.process_correlations = process_correlations or []

def throw(self, my_task):
payload = my_task.workflow.script_engine.evaluate(my_task, self.expression)
Expand All @@ -40,6 +41,14 @@ def throw(self, my_task):
my_task.workflow.correlations.update(correlations)
my_task.workflow.top_workflow.catch(event)

def update_task(self, my_task):
correlations = self.calculate_correlations(
my_task.workflow.script_engine,
self.process_correlations,
my_task.data
)
my_task.workflow.correlations.update(correlations)

def update_task_data(self, my_task):
if self.message_var is not None:
my_task.data[self.message_var] = my_task.internal_data.pop(self.name)
Expand Down
4 changes: 2 additions & 2 deletions tests/SpiffWorkflow/spiff/BaseTestCase.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def load_workflow_spec(self, filename, process_name, dmn_filename=None, validate

def load_collaboration(self, filename, collaboration_name):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
parser = SpiffBpmnParser()
parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.get_collaboration(collaboration_name)

def get_all_specs(self, filename):
f = os.path.join(os.path.dirname(__file__), 'data', filename)
parser = SpiffBpmnParser()
parser = SpiffBpmnParser(validator=VALIDATOR)
parser.add_bpmn_files_by_glob(f)
return parser.find_all_specs()
36 changes: 35 additions & 1 deletion tests/SpiffWorkflow/spiff/CorrelationTest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.bpmn import BpmnWorkflow, BpmnEvent
from SpiffWorkflow import TaskState

from .BaseTestCase import BaseTestCase

Expand Down Expand Up @@ -53,3 +54,36 @@ def testTwoCorrelatonKeys(self):
self.assertNotIn('message_correlation_key_one', messages[1].correlations)
self.assertIn('message_correlation_key_two', messages[1].correlations)
self.assertNotIn('message_correlation_key_two', messages[0].correlations)


class ReceiveCorrelationTest(BaseTestCase):

def testReceiveCorrelations(self):
self.actual_test()

def testReceiveCorrelationsSaveRestore(self):
self.actual_test(True)

def actual_test(self, save_restore=False):
spec, subprocesses = self.load_workflow_spec('receive_correlations.bpmn', 'correlation-test')
self.workflow = BpmnWorkflow(spec, subprocesses)
if save_restore:
self.save_restore()
self.workflow.do_engine_steps()
task = self.workflow.get_next_task(state=TaskState.READY)
task.data.update(value_1='a', value_2='b')
task.run()
self.workflow.do_engine_steps()
self.assertEqual(self.workflow.correlations, {'message': {'prop_1': 'a', 'prop_2': 'b'}})
waiting_task = self.workflow.get_next_task(state=TaskState.WAITING)
event_def = waiting_task.task_spec.event_definition
payload = {'msg_value_1': 'a', 'msg_value_2': 'b'}
correlations = event_def.calculate_correlations(
waiting_task.workflow.script_engine,
event_def.correlation_properties,
payload
)
event = BpmnEvent(event_def, payload, correlations)
self.workflow.catch(event)
self.workflow.do_engine_steps()
self.assertTrue(self.workflow.is_completed)
32 changes: 18 additions & 14 deletions tests/SpiffWorkflow/spiff/data/correlation.bpmn
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@
<bpmn:messagePath>init_id</bpmn:messagePath>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:message id="Message_19nm5f5" name="init_proc_2">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'num': task_num, 'name': task_name}</spiffworkflow:messagePayload>
<spiffworkflow:messageVariable>source_task</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_0fc1gu7" name="proc_2_response">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'init_id': source_task['num'], 'response': response}</spiffworkflow:messagePayload>
<spiffworkflow:messageVariable>response</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_19nm5f5" name="init_proc_2" />
<bpmn:message id="Message_0fc1gu7" name="proc_2_response" />
<bpmn:process id="proc_1" name="Process 1" isExecutable="true">
<bpmn:sequenceFlow id="Flow_0lrjj2a" sourceRef="StartEvent_1" targetRef="subprocess" />
<bpmn:sequenceFlow id="Flow_0gp7t8p" sourceRef="subprocess" targetRef="Event_0qga5tr" />
Expand All @@ -53,11 +43,17 @@
<bpmn:sequenceFlow id="Flow_02xt17l" sourceRef="configure" targetRef="start_proc_2" />
<bpmn:sequenceFlow id="Flow_0ts36fv" sourceRef="start_proc_2" targetRef="get_response" />
<bpmn:sendTask id="start_proc_2" name="Start Process 2" messageRef="Message_19nm5f5">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'num': task_num, 'name': task_name}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
<bpmn:incoming>Flow_02xt17l</bpmn:incoming>
<bpmn:outgoing>Flow_0ts36fv</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_17cd3h6" sourceRef="get_response" targetRef="subprocess_end" />
<bpmn:receiveTask id="get_response" name="Await Response" messageRef="Message_0fc1gu7">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>response</spiffworkflow:messageVariable>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0ts36fv</bpmn:incoming>
<bpmn:outgoing>Flow_17cd3h6</bpmn:outgoing>
</bpmn:receiveTask>
Expand All @@ -69,7 +65,11 @@
<bpmn:process id="proc_2" name="Process 2" isExecutable="true">
<bpmn:startEvent id="message_start" name="Message Start">
<bpmn:outgoing>Flow_0qafvbe</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_12ck2a4" messageRef="Message_19nm5f5" />
<bpmn:messageEventDefinition id="MessageEventDefinition_12ck2a4" messageRef="Message_19nm5f5">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>source_task</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:messageEventDefinition>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0k7rc31" sourceRef="respond" targetRef="Event_01itene" />
<bpmn:sequenceFlow id="Flow_0qafvbe" sourceRef="message_start" targetRef="prepare_response" />
Expand All @@ -81,7 +81,11 @@
<bpmn:intermediateThrowEvent id="respond" name="Respond">
<bpmn:incoming>Flow_12j0ayf</bpmn:incoming>
<bpmn:outgoing>Flow_0k7rc31</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0z73w20" messageRef="Message_0fc1gu7" />
<bpmn:messageEventDefinition id="MessageEventDefinition_0z73w20" messageRef="Message_0fc1gu7">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{'init_id': source_task['num'], 'response': response}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:messageEventDefinition>
</bpmn:intermediateThrowEvent>
<bpmn:endEvent id="Event_01itene">
<bpmn:incoming>Flow_0k7rc31</bpmn:incoming>
Expand Down
Loading

0 comments on commit 2a6a705

Please sign in to comment.