From 85965bc16c127399aca640bcb15178beedb1f73f Mon Sep 17 00:00:00 2001 From: Sanoj Punchihewa Date: Mon, 4 Nov 2024 10:15:29 +0530 Subject: [PATCH] Add scatter gather mediator --- .../org/apache/synapse/SynapseConstants.java | 3 + .../collectors/CloseEventCollector.java | 13 + .../handling/event/CloseEventHandler.java | 7 + .../management/handling/span/SpanHandler.java | 10 + .../config/xml/MediatorFactoryFinder.java | 3 +- .../config/xml/MediatorSerializerFinder.java | 3 +- .../xml/ScatterGatherMediatorFactory.java | 136 ++++ .../xml/ScatterGatherMediatorSerializer.java | 84 +++ .../synapse/mediators/MediatorWorker.java | 45 +- .../mediators/base/SequenceMediator.java | 12 + .../mediators/eip/aggregator/Aggregate.java | 41 +- .../synapse/mediators/v2/ScatterGather.java | 710 ++++++++++++++++++ ...catterGatherMediatorSerializationTest.java | 51 ++ 13 files changed, 1109 insertions(+), 9 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java create mode 100755 modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java create mode 100644 modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java create mode 100644 modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 47647ee066..edcdbeb1b0 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -625,4 +625,7 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE public static final String JAEGER_SPAN_ID = "jaeger_span_id"; public static final String ANALYTICS_METADATA = "ANALYTICS_METADATA"; + + public static final String SCATTER_MESSAGES = "SCATTER_MESSAGES"; + public static final String CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER = "CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER"; } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java index 4c7dfc361e..d6b197bd45 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/CloseEventCollector.java @@ -148,4 +148,17 @@ public static void tryEndFlow(MessageContext messageContext, String componentNam // closeFlowForcefully(messageContext); } } + + /** + * This method will close the event collector and finish the flow when a Scatter Gather mediator is used. + * + * @param messageContext synapse message context. + */ + public static void closeEventsAfterScatterGather(MessageContext messageContext) { + + if (isOpenTelemetryEnabled()) { + OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() + .handleScatterGatherFinishEvent(messageContext); + } + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java index 16522cbeb3..10b861417b 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java @@ -45,4 +45,11 @@ public interface CloseEventHandler { * @param synCtx Message context. */ void handleTryEndFlow(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + + /** + * Handles a close flow event. + * + * @param synCtx Message context. + */ + void handleScatterGatherFinishEvent(MessageContext synCtx); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java index cbf5bf1218..9a5c1f57b9 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java @@ -430,6 +430,16 @@ private void handleCallbackFinishEvent(MessageContext messageContext) { } } + public void handleScatterGatherFinishEvent(MessageContext messageContext) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(messageContext); + synchronized (tracingScope.getSpanStore()) { + cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext); + SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper(); + tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext); + tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId()); + } + } + @Override public void handleStateStackInsertion(MessageContext synCtx, String seqName, SequenceType seqType) { TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java index 49b3a3ff27..cda214f563 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java @@ -103,7 +103,8 @@ public class MediatorFactoryFinder implements XMLToObjectMapper { ForEachMediatorFactory.class, JSONTransformMediatorFactory.class, NTLMMediatorFactory.class, - VariableMediatorFactory.class + VariableMediatorFactory.class, + ScatterGatherMediatorFactory.class }; private final static MediatorFactoryFinder instance = new MediatorFactoryFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java index 174d90511c..eb12240762 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java @@ -77,7 +77,8 @@ public class MediatorSerializerFinder { ForEachMediatorSerializer.class, JSONTransformMediatorSerializer.class, NTLMMediatorSerializer.class, - VariableMediatorSerializer.class + VariableMediatorSerializer.class, + ScatterGatherMediatorSerializer.class }; private final static MediatorSerializerFinder instance = new MediatorSerializerFinder(); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java new file mode 100644 index 0000000000..b630259471 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorFactory.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMAttribute; +import org.apache.axiom.om.OMElement; +import org.apache.synapse.Mediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; +import org.jaxen.JaxenException; + +import java.util.Iterator; +import java.util.Properties; +import javax.xml.namespace.QName; + +/** + * The <scatter-gather> mediator is used to copy messages in Synapse to similar messages but with + * different message contexts and aggregate the responses back. + * + *
+ * <scatter-gather parallel-execution=(true | false)>
+ *   <aggregation value-to-aggregate="expression" condition="expression" timeout="long"
+ *     min-messages="expression" max-messages="expression"/>
+ *   <target>
+ *     <sequence>
+ *       (mediator)+
+ *     </sequence>
+ *   </target>+
+ * </scatter-gather>
+ * 
+ */ +public class ScatterGatherMediatorFactory extends AbstractMediatorFactory { + + /** + * This will hold the QName of the clone mediator element in the xml configuration + */ + private static final QName SCATTER_GATHER_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "scatter-gather"); + private static final QName ELEMENT_AGGREGATE_Q + = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregation"); + private static final QName ATT_VALUE_TO_AGGREGATE = new QName("value-to-aggregate"); + private static final QName ATT_CONDITION = new QName("condition"); + private static final QName ATT_TIMEOUT = new QName("timeout"); + private static final QName ATT_MIN_MESSAGES = new QName("min-messages"); + private static final QName ATT_MAX_MESSAGES = new QName("max-messages"); + private static final QName TARGET_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "target"); + private static final QName PARALLEL_EXEC_Q = new QName("parallel-execution"); + + public Mediator createSpecificMediator(OMElement elem, Properties properties) { + + boolean asynchronousExe = true; + + ScatterGather mediator = new ScatterGather(); + processAuditStatus(mediator, elem); + + OMAttribute parallelExecAttr = elem.getAttribute(PARALLEL_EXEC_Q); + if (parallelExecAttr != null && parallelExecAttr.getAttributeValue().equals("false")) { + asynchronousExe = false; + } + + mediator.setParallelExecution(asynchronousExe); + + Iterator targetElements = elem.getChildrenWithName(TARGET_Q); + while (targetElements.hasNext()) { + Target target = TargetFactory.createTarget((OMElement) targetElements.next(), properties); + target.setAsynchronous(asynchronousExe); + mediator.addTarget(target); + } + + OMElement aggregateElement = elem.getFirstChildWithName(ELEMENT_AGGREGATE_Q); + if (aggregateElement != null) { + OMAttribute aggregateExpr = aggregateElement.getAttribute(ATT_VALUE_TO_AGGREGATE); + if (aggregateExpr != null) { + try { + mediator.setAggregationExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_VALUE_TO_AGGREGATE)); + } catch (JaxenException e) { + handleException("Unable to load the aggregating expression", e); + } + } + + OMAttribute conditionExpr = aggregateElement.getAttribute(ATT_CONDITION); + if (conditionExpr != null) { + try { + mediator.setCorrelateExpression( + SynapsePathFactory.getSynapsePath(aggregateElement, ATT_CONDITION)); + } catch (JaxenException e) { + handleException("Unable to load the condition expression", e); + } + } + + OMAttribute completeTimeout = aggregateElement.getAttribute(ATT_TIMEOUT); + if (completeTimeout != null) { + mediator.setCompletionTimeoutMillis(Long.parseLong(completeTimeout.getAttributeValue())); + } + + OMAttribute minMessages = aggregateElement.getAttribute(ATT_MIN_MESSAGES); + if (minMessages != null) { + mediator.setMinMessagesToComplete(new ValueFactory().createValue("min-messages", aggregateElement)); + } + + OMAttribute maxMessages = aggregateElement.getAttribute(ATT_MAX_MESSAGES); + if (maxMessages != null) { + mediator.setMaxMessagesToComplete(new ValueFactory().createValue("max-messages", aggregateElement)); + } + } + addAllCommentChildrenToList(elem, mediator.getCommentsList()); + return mediator; + } + + /** + * This method will implement the getTagQName method of the MediatorFactory interface + * + * @return QName of the clone element in xml configuration + */ + public QName getTagQName() { + + return SCATTER_GATHER_Q; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java new file mode 100755 index 0000000000..b585078f9a --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializer.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +import org.apache.axiom.om.OMElement; +import org.apache.synapse.Mediator; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.v2.ScatterGather; + +/** + * Serializer for {@link ScatterGather} instances. + */ +public class ScatterGatherMediatorSerializer extends AbstractMediatorSerializer { + + public OMElement serializeSpecificMediator(Mediator m) { + + ScatterGather scatterGatherMediator = null; + if (!(m instanceof ScatterGather)) { + handleException("Unsupported mediator passed in for serialization : " + m.getType()); + } else { + scatterGatherMediator = (ScatterGather) m; + } + + assert scatterGatherMediator != null; + OMElement scatterGatherElement = fac.createOMElement("scatter-gather", synNS); + saveTracingState(scatterGatherElement, scatterGatherMediator); + + scatterGatherElement.addAttribute(fac.createOMAttribute( + "parallel-execution", nullNS, Boolean.toString(scatterGatherMediator.getParallelExecution()))); + OMElement aggregationElement = fac.createOMElement("aggregation", synNS); + + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "value-to-aggregate"); + + if (scatterGatherMediator.getCorrelateExpression() != null) { + SynapsePathSerializer.serializePath( + scatterGatherMediator.getAggregationExpression(), aggregationElement, "condition"); + } + + if (scatterGatherMediator.getCompletionTimeoutMillis() != 0) { + aggregationElement.addAttribute(fac.createOMAttribute( + "timeout", nullNS, Long.toString(scatterGatherMediator.getCompletionTimeoutMillis()))); + } + if (scatterGatherMediator.getMinMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMinMessagesToComplete(), "min-messages", aggregationElement); + } + if (scatterGatherMediator.getMaxMessagesToComplete() != null) { + new ValueSerializer().serializeValue( + scatterGatherMediator.getMaxMessagesToComplete(), "max-messages", aggregationElement); + } + scatterGatherElement.addChild(aggregationElement); + + for (Target target : scatterGatherMediator.getTargets()) { + if (target != null) { + scatterGatherElement.addChild(TargetSerializer.serializeTarget(target)); + } + } + serializeComments(scatterGatherElement, scatterGatherMediator.getCommentsList()); + + return scatterGatherElement; + } + + public String getMediatorClassName() { + + return ScatterGather.class.getName(); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java index 1afe6f142c..ae7fffa435 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/MediatorWorker.java @@ -19,13 +19,24 @@ package org.apache.synapse.mediators; -import org.apache.synapse.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.FaultHandler; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.aspects.flow.statistics.StatisticsCloseEventListener; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.carbonext.TenantInfoConfigurator; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.SeqContinuationState; import org.apache.synapse.debug.SynapseDebugManager; +import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.util.logging.LoggingUtils; /** @@ -86,7 +97,25 @@ public void run() { debugManager.advertiseMediationFlowStartPoint(synCtx); } - seq.mediate(synCtx); + // If this is a scatter message, then we need to use the continuation state and continue the mediation + if (isScatterMessage(synCtx)) { + boolean result = seq.mediate(synCtx); + if (result) { + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(synCtx); + if (seqContinuationState == null) { + return; + } + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(synCtx, seqContinuationState); + + FlowContinuableMediator mediator = + (FlowContinuableMediator) sequenceMediator.getChild(seqContinuationState.getPosition()); + + synCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, true); + mediator.mediate(synCtx, seqContinuationState); + } + } else { + seq.mediate(synCtx); + } //((Axis2MessageContext)synCtx).getAxis2MessageContext().getEnvelope().discard(); } catch (SynapseException syne) { @@ -150,4 +179,16 @@ private void warn(boolean traceOn, String msg, MessageContext msgContext) { public void setStatisticsCloseEventListener(StatisticsCloseEventListener statisticsCloseEventListener) { this.statisticsCloseEventListener = statisticsCloseEventListener; } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isSkipContinuationState != null && isSkipContinuationState; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java index ccf33c72ec..2c28222924 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/base/SequenceMediator.java @@ -521,4 +521,16 @@ public void setComponentStatisticsId(ArtifactHolder holder) { StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.SEQUENCE, holder); } } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isScatterMessage(MessageContext synCtx) { + + Boolean isSkipContinuationState = (Boolean) synCtx.getProperty(SynapseConstants.SCATTER_MESSAGES); + return isSkipContinuationState != null && isSkipContinuationState; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java index 3a2f621511..b3f813521c 100755 --- a/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java @@ -26,6 +26,7 @@ import org.apache.synapse.SynapseLog; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.v2.ScatterGather; import java.util.ArrayList; import java.util.List; @@ -50,6 +51,7 @@ public class Aggregate extends TimerTask { private String correlation = null; /** The AggregateMediator that should be invoked on completion of the aggregation */ private AggregateMediator aggregateMediator = null; + private ScatterGather scatterGatherMediator = null; private List messages = new ArrayList(); private boolean locked = false; private boolean completed = false; @@ -87,6 +89,24 @@ public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMilli this.aggregateMediator = mediator; } + public Aggregate(SynapseEnvironment synEnv, String corelation, long timeoutMillis, int min, + int max, ScatterGather scatterGatherMediator, FaultHandler faultHandler) { + + this.synEnv = synEnv; + this.correlation = corelation; + if (timeoutMillis > 0) { + expiryTimeMillis = System.currentTimeMillis() + timeoutMillis; + } + if (min > 0) { + minCount = min; + } + if (max > 0) { + maxCount = max; + } + this.faultHandler = faultHandler; + this.scatterGatherMediator = scatterGatherMediator; + } + /** * Add a message to the interlan message list * @@ -118,9 +138,15 @@ public synchronized boolean isComplete(SynapseLog synLog) { // get total messages for this group, from the first message we have collected MessageContext mc = messages.get(0); - Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + - (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); - + Object prop; + if (aggregateMediator != null) { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (aggregateMediator.getId() != null ? "." + aggregateMediator.getId() : "")); + } else { + prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE + + (scatterGatherMediator.getId() != null ? "." + scatterGatherMediator.getId() : "")); + } + if (prop != null && prop instanceof String) { String[] msgSequence = prop.toString().split( EIPConstants.MESSAGE_SEQUENCE_DELEMITER); @@ -264,8 +290,13 @@ private class AggregateTimeout implements Runnable { public void run() { MessageContext messageContext = aggregate.getLastMessage(); try { - log.warn("Aggregate mediator timeout occurred."); - aggregateMediator.completeAggregate(aggregate); + if (aggregateMediator != null) { + log.warn("Aggregate mediator timeout occurred."); + aggregateMediator.completeAggregate(aggregate); + } else { + log.warn("Scatter Gather mediator timeout occurred."); + scatterGatherMediator.completeAggregate(aggregate); + } } catch (Exception ex) { if (faultHandler != null && messageContext != null) { faultHandler.handleFault(messageContext, ex); diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java new file mode 100644 index 0000000000..589ab2b7d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/mediators/v2/ScatterGather.java @@ -0,0 +1,710 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.mediators.v2; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonSyntaxException; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.context.OperationContext; +import org.apache.synapse.ContinuationState; +import org.apache.synapse.ManagedLifecycle; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.SynapseLog; +import org.apache.synapse.aspects.AspectConfiguration; +import org.apache.synapse.aspects.ComponentType; +import org.apache.synapse.aspects.flow.statistics.StatisticIdentityGenerator; +import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; +import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder; +import org.apache.synapse.aspects.flow.statistics.util.StatisticDataCollectionHelper; +import org.apache.synapse.commons.json.JsonUtil; +import org.apache.synapse.config.xml.SynapsePath; +import org.apache.synapse.continuation.ContinuationStackManager; +import org.apache.synapse.continuation.ReliantContinuationState; +import org.apache.synapse.continuation.SeqContinuationState; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.AbstractMediator; +import org.apache.synapse.mediators.FlowContinuableMediator; +import org.apache.synapse.mediators.Value; +import org.apache.synapse.mediators.base.SequenceMediator; +import org.apache.synapse.mediators.eip.EIPConstants; +import org.apache.synapse.mediators.eip.EIPUtils; +import org.apache.synapse.mediators.eip.SharedDataHolder; +import org.apache.synapse.mediators.eip.Target; +import org.apache.synapse.mediators.eip.aggregator.Aggregate; +import org.apache.synapse.transport.passthru.util.RelayUtils; +import org.apache.synapse.util.MessageHelper; +import org.apache.synapse.util.xpath.SynapseJsonPath; +import org.apache.synapse.util.xpath.SynapseXPath; +import org.jaxen.JaxenException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Timer; +import javax.xml.stream.XMLStreamException; + +public class ScatterGather extends AbstractMediator implements ManagedLifecycle, FlowContinuableMediator { + + private final Object lock = new Object(); + private final Map activeAggregates = Collections.synchronizedMap(new HashMap<>()); + private String id; + private List targets = new ArrayList<>(); + private long completionTimeoutMillis = 0; + private Value maxMessagesToComplete; + private Value minMessagesToComplete; + private SynapsePath correlateExpression = null; + private SynapsePath aggregationExpression = null; + private boolean parallelExecution = true; + private Integer statisticReportingIndex; + + public ScatterGather() { + + id = String.valueOf(new Random().nextLong()); + } + + public void setParallelExecution(boolean parallelExecution) { + + this.parallelExecution = parallelExecution; + } + + public boolean getParallelExecution() { + + return this.parallelExecution; + } + + public String getId() { + + return id; + } + + @Override + public boolean mediate(MessageContext synCtx) { + + boolean aggregationResult = false; + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Start : Scatter Gather mediator"); + + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Message : " + synCtx.getEnvelope()); + } + } + + synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder()); + Iterator iter = targets.iterator(); + int i = 0; + while (iter.hasNext()) { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Submitting " + (i + 1) + " of " + targets.size() + + " messages for " + (parallelExecution ? "parallel processing" : "sequential processing")); + } + + MessageContext clonedMsgCtx = getClonedMessageContext(synCtx, i++, targets.size()); + ContinuationStackManager.addReliantContinuationState(clonedMsgCtx, i - 1, getMediatorPosition()); + boolean result = iter.next().mediate(clonedMsgCtx); + if (!parallelExecution && result) { + aggregationResult = aggregateMessages(clonedMsgCtx, synLog); + } + } + OperationContext opCtx + = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext(); + if (opCtx != null) { + opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP"); + } + return aggregationResult; + } + + public void init(SynapseEnvironment se) { + + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.init(se); + } + } + } + + public void destroy() { + + for (Target target : targets) { + ManagedLifecycle seq = target.getSequence(); + if (seq != null) { + seq.destroy(); + } + } + } + + /** + * Clone the provided message context as a new message, and set the aggregation ID and the message sequence count + * + * @param synCtx - MessageContext which is subjected to the cloning + * @param messageSequence - the position of this message of the cloned set + * @param messageCount - total of cloned copies + * @return MessageContext the cloned message context + */ + private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence, int messageCount) { + + MessageContext newCtx = null; + try { + newCtx = MessageHelper.cloneMessageContext(synCtx); + // Set isServerSide property in the cloned message context + ((Axis2MessageContext) newCtx).getAxis2MessageContext().setServerSide( + ((Axis2MessageContext) synCtx).getAxis2MessageContext().isServerSide()); + // Set the SCATTER_MESSAGES property to the cloned message context which will be used by the MediatorWorker + // to continue the mediation from the continuation state + newCtx.setProperty(SynapseConstants.SCATTER_MESSAGES, true); + if (id != null) { + newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id, synCtx.getMessageID()); + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); + } else { + newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE, messageSequence + + EIPConstants.MESSAGE_SEQUENCE_DELEMITER + messageCount); + } + } catch (AxisFault axisFault) { + handleException("Error cloning the message context", axisFault, synCtx); + } + return newCtx; + } + + public List getTargets() { + + return targets; + } + + public void setTargets(List targets) { + + this.targets = targets; + } + + public void addTarget(Target target) { + + this.targets.add(target); + } + + public SynapsePath getAggregationExpression() { + + return aggregationExpression; + } + + public void setAggregationExpression(SynapsePath aggregationExpression) { + + this.aggregationExpression = aggregationExpression; + } + + @Override + public boolean mediate(MessageContext synCtx, ContinuationState continuationState) { + + SynapseLog synLog = getLog(synCtx); + + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Scatter Gather mediator : Mediating from ContinuationState"); + } + + boolean result; + // If the continuation is triggered from a mediator worker and has children, then mediate through the sub branch + // otherwise start aggregation + if (isContinuationTriggeredFromMediatorWorker(synCtx)) { + if (continuationState.hasChild()) { + int subBranch = ((ReliantContinuationState) continuationState.getChildContState()).getSubBranch(); + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getChildContState().getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } else { + result = true; + } + } else { + // If the continuation is triggered from a callback, continue the mediation from the continuation state + int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); + + SequenceMediator branchSequence = targets.get(subBranch).getSequence(); + boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + if (!continuationState.hasChild()) { + result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1); + } else { + FlowContinuableMediator mediator = + (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition()); + + result = mediator.mediate(synCtx, continuationState.getChildContState()); + if (isStatisticsEnabled) { + ((Mediator) mediator).reportCloseStatistics(synCtx, null); + } + } + // If the mediation is completed, remove the child continuation state from the stack, so the aggregation + // will continue the mediation from the parent continuation state + ContinuationStackManager.removeReliantContinuationState(synCtx); + } + if (result) { + return aggregateMessages(synCtx, synLog); + } + return false; + } + + private boolean aggregateMessages(MessageContext synCtx, SynapseLog synLog) { + + Aggregate aggregate = null; + String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id : + EIPConstants.AGGREGATE_CORRELATION); + + Object correlationID = synCtx.getProperty(correlationIdName); + String correlation; + + Object result = null; + if (correlateExpression != null) { + try { + result = correlateExpression instanceof SynapseXPath ? correlateExpression.evaluate(synCtx) : + ((SynapseJsonPath) correlateExpression).evaluate(synCtx); + } catch (JaxenException e) { + handleException("Unable to execute the XPATH over the message", e, synCtx); + } + if (result instanceof List) { + if (((List) result).isEmpty()) { + handleException("Failed to evaluate correlate expression: " + correlateExpression.toString(), synCtx); + } + } + if (result instanceof Boolean) { + if (!(Boolean) result) { + return true; + } + } + } + if (result != null) { + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlateExpression.toString())) { + aggregate = activeAggregates.get(correlateExpression.toString()); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } + } else { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Creating new Aggregator - " + + (completionTimeoutMillis > 0 ? "expires in : " + + (completionTimeoutMillis / 1000) + "secs" : + "without expiry time")); + } + if (isAggregationCompleted(synCtx)) { + return false; + } + + Double minMsg = -1.0; + if (minMessagesToComplete != null) { + minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); + } + Double maxMsg = -1.0; + if (maxMessagesToComplete != null) { + maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); + } + + aggregate = new Aggregate( + synCtx.getEnvironment(), + correlateExpression.toString(), + completionTimeoutMillis, + minMsg.intValue(), + maxMsg.intValue(), this, synCtx.getFaultStack().peek()); + + if (completionTimeoutMillis > 0) { + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } + aggregate.getLock(); + activeAggregates.put(correlateExpression.toString(), aggregate); + } + } + } + } else if (correlationID instanceof String) { + correlation = (String) correlationID; + while (aggregate == null) { + synchronized (lock) { + if (activeAggregates.containsKey(correlation)) { + aggregate = activeAggregates.get(correlation); + if (aggregate != null) { + if (!aggregate.getLock()) { + aggregate = null; + } + } else { + break; + } + } else { + if (synLog.isTraceOrDebugEnabled()) { + synLog.traceOrDebug("Creating new Aggregator - " + + (completionTimeoutMillis > 0 ? "expires in : " + + (completionTimeoutMillis / 1000) + "secs" : + "without expiry time")); + } + if (isAggregationCompleted(synCtx)) { + return false; + } + + Double minMsg = -1.0; + if (minMessagesToComplete != null) { + minMsg = Double.parseDouble(minMessagesToComplete.evaluateValue(synCtx)); + } + Double maxMsg = -1.0; + if (maxMessagesToComplete != null) { + maxMsg = Double.parseDouble(maxMessagesToComplete.evaluateValue(synCtx)); + } + aggregate = new Aggregate( + synCtx.getEnvironment(), + correlation, + completionTimeoutMillis, + minMsg.intValue(), + maxMsg.intValue(), this, synCtx.getFaultStack().peek()); + + if (completionTimeoutMillis > 0) { + synchronized (aggregate) { + if (!aggregate.isCompleted()) { + try { + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } catch (IllegalStateException e) { + log.warn("Synapse timer already cancelled. Resetting Synapse timer"); + synCtx.getConfiguration().setSynapseTimer(new Timer(true)); + synCtx.getConfiguration().getSynapseTimer(). + schedule(aggregate, completionTimeoutMillis); + } + } + } + } + aggregate.getLock(); + activeAggregates.put(correlation, aggregate); + } + } + } + } else { + synLog.traceOrDebug("Unable to find aggregation correlation property"); + return true; + } + // if there is an aggregate continue on aggregation + if (aggregate != null) { + boolean collected = aggregate.addMessage(synCtx); + if (synLog.isTraceOrDebugEnabled()) { + if (collected) { + synLog.traceOrDebug("Collected a message during aggregation"); + if (synLog.isTraceTraceEnabled()) { + synLog.traceTrace("Collected message : " + synCtx); + } + } + } + if (aggregate.isComplete(synLog)) { + synLog.traceOrDebug("Aggregation completed"); + boolean onCompleteSeqResult = completeAggregate(aggregate); + synLog.traceOrDebug("End : Scatter Gather mediator"); + return onCompleteSeqResult; + } else { + aggregate.releaseLock(); + } + } else { + synLog.traceOrDebug("Unable to find an aggregate for this message - skip"); + return true; + } + return false; + } + + private boolean isAggregationCompleted(MessageContext synCtx) { + + Object aggregateTimeoutHolderObj = + synCtx.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + if (sharedDataHolder.isAggregationCompleted()) { + if (log.isDebugEnabled()) { + log.debug("Received a response for already completed Aggregate"); + } + return true; + } + } + return false; + } + + public boolean completeAggregate(Aggregate aggregate) { + + boolean markedCompletedNow = false; + boolean wasComplete = aggregate.isCompleted(); + if (wasComplete) { + return false; + } + + if (log.isDebugEnabled()) { + log.debug("Aggregation completed or timed out"); + } + + // cancel the timer + synchronized (this) { + if (!aggregate.isCompleted()) { + aggregate.cancel(); + aggregate.setCompleted(true); + + MessageContext lastMessage = aggregate.getLastMessage(); + if (lastMessage != null) { + Object aggregateTimeoutHolderObj = + lastMessage.getProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id : + EIPConstants.EIP_SHARED_DATA_HOLDER); + + if (aggregateTimeoutHolderObj != null) { + SharedDataHolder sharedDataHolder = (SharedDataHolder) aggregateTimeoutHolderObj; + sharedDataHolder.markAggregationCompletion(); + } + } + markedCompletedNow = true; + } + } + + if (!markedCompletedNow) { + return false; + } + + MessageContext newSynCtx = getAggregatedMessage(aggregate); + + if (newSynCtx == null) { + log.warn("An aggregation of messages timed out with no aggregated messages", null); + return false; + } + aggregate.clear(); + activeAggregates.remove(aggregate.getCorrelation()); + newSynCtx.setProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER, false); + SeqContinuationState seqContinuationState = (SeqContinuationState) ContinuationStackManager.peakContinuationStateStack(newSynCtx); + boolean result = false; + + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + CloseEventCollector.closeEntryEvent(newSynCtx, getMediatorName(), ComponentType.MEDIATOR, + statisticReportingIndex, isContentAltering()); + } + + if (seqContinuationState != null) { + SequenceMediator sequenceMediator = ContinuationStackManager.retrieveSequence(newSynCtx, seqContinuationState); + result = sequenceMediator.mediate(newSynCtx, seqContinuationState); + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + sequenceMediator.reportCloseStatistics(newSynCtx, null); + } + } + CloseEventCollector.closeEventsAfterScatterGather(newSynCtx); + return result; + } + + private MessageContext getAggregatedMessage(Aggregate aggregate) { + + MessageContext newCtx = null; + JsonArray jsonArray = new JsonArray(); + JsonElement result; + boolean isJSONAggregation = aggregationExpression instanceof SynapseJsonPath; + + for (MessageContext synCtx : aggregate.getMessages()) { + if (newCtx == null) { + try { + newCtx = MessageHelper.cloneMessageContext(synCtx, true, false, true); + } catch (AxisFault axisFault) { + handleException(aggregate, "Error creating a copy of the message", axisFault, synCtx); + } + + if (log.isDebugEnabled()) { + log.debug("Generating Aggregated message from : " + newCtx.getEnvelope()); + } + if (isJSONAggregation) { + jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); + } else { + try { + EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx, (SynapseXPath) aggregationExpression); + } catch (JaxenException e) { + handleException(aggregate, "Error merging aggregation results using XPath : " + + aggregationExpression.toString(), e, synCtx); + } + } + } else { + try { + if (log.isDebugEnabled()) { + log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " + + aggregationExpression); + } + // When the target sequences are not content aware, the message builder wont get triggered. + // Therefore, we need to build the message to do the aggregation. + RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext()); + if (isJSONAggregation) { + jsonArray.add(EIPUtils.getJSONElement(synCtx, (SynapseJsonPath) aggregationExpression)); + } else { + EIPUtils.enrichEnvelope(newCtx.getEnvelope(), synCtx.getEnvelope(), synCtx, (SynapseXPath) + aggregationExpression); + } + + if (log.isDebugEnabled()) { + log.debug("Merged result : " + newCtx.getEnvelope()); + } + } catch (JaxenException e) { + handleException(aggregate, "Error merging aggregation results using XPath : " + + aggregationExpression.toString(), e, synCtx); + } catch (SynapseException e) { + handleException(aggregate, "Error evaluating expression: " + aggregationExpression.toString(), e, synCtx); + } catch (JsonSyntaxException e) { + handleException(aggregate, "Error reading JSON element: " + aggregationExpression.toString(), e, synCtx); + } catch (IOException e) { + handleException(aggregate, "IO Error occurred while building the message", e, synCtx); + } catch (XMLStreamException e) { + handleException(aggregate, "XML Error occurred while building the message", e, synCtx); + } + } + } + + result = jsonArray; + + StatisticDataCollectionHelper.collectAggregatedParents(aggregate.getMessages(), newCtx); + if (isJSONAggregation) { + // setting the new JSON payload to the messageContext + try { + JsonUtil.getNewJsonPayload(((Axis2MessageContext) newCtx).getAxis2MessageContext(), new + ByteArrayInputStream(result.toString().getBytes()), true, true); + } catch (AxisFault axisFault) { + log.error("Error occurred while setting the new JSON payload to the msg context", axisFault); + } + } else { + // Removing the JSON stream after aggregated using XML path. + // This will fix inconsistent behaviour in logging the payload. + ((Axis2MessageContext) newCtx).getAxis2MessageContext() + .removeProperty(org.apache.synapse.commons.json.Constants.ORG_APACHE_SYNAPSE_COMMONS_JSON_JSON_INPUT_STREAM); + } + return newCtx; + } + + public SynapsePath getCorrelateExpression() { + + return correlateExpression; + } + + public void setCorrelateExpression(SynapsePath correlateExpression) { + + this.correlateExpression = correlateExpression; + this.id = null; + } + + public long getCompletionTimeoutMillis() { + + return completionTimeoutMillis; + } + + public void setCompletionTimeoutMillis(long completionTimeoutMillis) { + + this.completionTimeoutMillis = completionTimeoutMillis; + } + + public Value getMinMessagesToComplete() { + + return minMessagesToComplete; + } + + public void setMinMessagesToComplete(Value minMessagesToComplete) { + + this.minMessagesToComplete = minMessagesToComplete; + } + + public Value getMaxMessagesToComplete() { + + return maxMessagesToComplete; + } + + public void setMaxMessagesToComplete(Value maxMessagesToComplete) { + + this.maxMessagesToComplete = maxMessagesToComplete; + } + + /** + * Check whether the message is a scatter message or not + * + * @param synCtx MessageContext + * @return true if the message is a scatter message + */ + private static boolean isContinuationTriggeredFromMediatorWorker(MessageContext synCtx) { + + Boolean isContinuationTriggeredMediatorWorker = + (Boolean) synCtx.getProperty(SynapseConstants.CONTINUE_FLOW_TRIGGERED_FROM_MEDIATOR_WORKER); + return isContinuationTriggeredMediatorWorker != null && isContinuationTriggeredMediatorWorker; + } + + @Override + public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) { + + statisticReportingIndex = OpenEventCollector.reportFlowContinuableEvent(messageContext, getMediatorName(), + ComponentType.MEDIATOR, getAspectConfiguration(), isContentAltering() || isContentAltering); + return statisticReportingIndex; + } + + @Override + public void reportCloseStatistics(MessageContext messageContext, Integer currentIndex) { + + // Do nothing here as the close event is reported in the completeAggregate method + } + + @Override + public void setComponentStatisticsId(ArtifactHolder holder) { + + if (getAspectConfiguration() == null) { + configure(new AspectConfiguration(getMediatorName())); + } + String sequenceId = + StatisticIdentityGenerator.getIdForFlowContinuableMediator(getMediatorName(), ComponentType.MEDIATOR, holder); + getAspectConfiguration().setUniqueId(sequenceId); + for (Target target : targets) { + target.setStatisticIdForMediators(holder); + } + + StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.MEDIATOR, holder); + } + + @Override + public boolean isContentAltering() { + + return true; + } + + private void handleException(Aggregate aggregate, String msg, Exception exception, MessageContext msgContext) { + + aggregate.clear(); + activeAggregates.clear(); + if (exception != null) { + super.handleException(msg, exception, msgContext); + } else { + super.handleException(msg, msgContext); + } + } +} diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java new file mode 100644 index 0000000000..bcef1aec66 --- /dev/null +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/ScatterGatherMediatorSerializationTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config.xml; + +/** + * Factory and Serializer tests for the ScatterGatherMediator + */ + +public class ScatterGatherMediatorSerializationTest extends AbstractTestCase { + + private ScatterGatherMediatorFactory scatterGatherMediatorFactory; + private ScatterGatherMediatorSerializer scatterGatherMediatorSerializer; + + public ScatterGatherMediatorSerializationTest() { + + super(ScatterGatherMediatorSerializer.class.getName()); + scatterGatherMediatorFactory = new ScatterGatherMediatorFactory(); + scatterGatherMediatorSerializer = new ScatterGatherMediatorSerializer(); + } + + public void testScatterGatherSerialization() { + + String inputXML = "" + + "" + + "{ \"pet\": { " + + "\"name\": \"pet1\", \"type\": \"dog\" }, " + + "\"status\": \"success\" }" + + "" + + "" + + "" + + ""; + + assertTrue(serialization(inputXML, scatterGatherMediatorFactory, scatterGatherMediatorSerializer)); + } +}