diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index cdd08312a2..16e00b53e1 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -740,4 +740,6 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE public static final String CONNECTOR_ARTIFACT = "CONNECTOR_ARTIFACT"; public static final String APPEND_ARTIFACT_IDENTIFIER = "APPEND_ARTIFACT_IDENTIFIER"; public static final String EXPOSE_VERSIONED_SERVICES = "expose.versioned.services"; + + public static final String SKIP_MAIN_SEQUENCE = "SKIP_MAIN_SEQUENCE"; } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/AspectConfiguration.java b/modules/core/src/main/java/org/apache/synapse/aspects/AspectConfiguration.java index b4bdd18a43..3d05b97388 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/AspectConfiguration.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/AspectConfiguration.java @@ -47,7 +47,9 @@ public class AspectConfiguration implements StatisticsConfigurable, Identifiable private Integer hashCode = null; - public AspectConfiguration(String id) { + private boolean traceFilterEnable = false; + + public AspectConfiguration(String id) { this.id = id; } @@ -120,4 +122,15 @@ public Integer getHashCode() { return hashCode; } + public void enableTraceFilter() { + this.traceFilterEnable = true; + } + + public void disableTraceFilter() { + this.traceFilterEnable = false; + } + + public boolean isTraceFilterEnable() { + return traceFilterEnable; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/OpenEventCollector.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/OpenEventCollector.java index 596d183b6a..aadebd3af2 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/OpenEventCollector.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/OpenEventCollector.java @@ -102,7 +102,9 @@ public static Integer reportEntryEvent(MessageContext messageContext, String com StatisticsOpenEvent openEvent = new StatisticsOpenEvent(statisticDataUnit); addEventAndIncrementCount(messageContext, openEvent); - if (isOpenTelemetryEnabled()) { + boolean isFiltered = isSpanFilteredMediator(componentName) || + (aspectConfiguration != null && aspectConfiguration.isTraceFilterEnable()); + if (!isFiltered && isOpenTelemetryEnabled()) { OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() .handleOpenEntryEvent(statisticDataUnit, messageContext); } @@ -134,7 +136,9 @@ public static Integer reportChildEntryEvent(MessageContext messageContext, Strin reportMediatorStatistics(messageContext, componentName, componentType, isContentAltering, statisticDataUnit, aspectConfiguration); - if (isOpenTelemetryEnabled()) { + boolean isFiltered = isSpanFilteredMediator(componentName) || + (aspectConfiguration != null && aspectConfiguration.isTraceFilterEnable()); + if (!isFiltered && isOpenTelemetryEnabled()) { OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() .handleOpenChildEntryEvent(statisticDataUnit, messageContext); } @@ -168,7 +172,9 @@ public static Integer reportFlowContinuableEvent(MessageContext messageContext, reportMediatorStatistics(messageContext, componentName, componentType, isContentAltering, statisticDataUnit, aspectConfiguration); - if (isOpenTelemetryEnabled()) { + boolean isFiltered = isSpanFilteredMediator(componentName) || + (aspectConfiguration != null && aspectConfiguration.isTraceFilterEnable()); + if (!isFiltered && isOpenTelemetryEnabled()) { OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() .handleOpenFlowContinuableEvent(statisticDataUnit, messageContext); } @@ -203,7 +209,9 @@ public static Integer reportFlowSplittingEvent(MessageContext messageContext, St reportMediatorStatistics(messageContext, componentName, componentType, isContentAltering, statisticDataUnit, aspectConfiguration); - if (isOpenTelemetryEnabled()) { + boolean isFiltered = isSpanFilteredMediator(componentName) || + (aspectConfiguration != null && aspectConfiguration.isTraceFilterEnable()); + if (!isFiltered && isOpenTelemetryEnabled()) { OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() .handleOpenFlowSplittingEvent(statisticDataUnit, messageContext); } @@ -237,7 +245,9 @@ public static Integer reportFlowAggregateEvent(MessageContext messageContext, St reportMediatorStatistics(messageContext, componentName, componentType, isContentAltering, statisticDataUnit, aspectConfiguration); - if (isOpenTelemetryEnabled()) { + boolean isFiltered = isSpanFilteredMediator(componentName) || + (aspectConfiguration != null && aspectConfiguration.isTraceFilterEnable()); + if (!isFiltered && isOpenTelemetryEnabled()) { OpenTelemetryManagerHolder.getOpenTelemetryManager().getHandler() .handleOpenFlowAggregateEvent(statisticDataUnit, messageContext); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/RuntimeStatisticCollector.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/RuntimeStatisticCollector.java index d59a5c80d9..1876d89301 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/RuntimeStatisticCollector.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/collectors/RuntimeStatisticCollector.java @@ -18,6 +18,9 @@ package org.apache.synapse.aspects.flow.statistics.collectors; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.MessageContext; @@ -71,6 +74,16 @@ public abstract class RuntimeStatisticCollector { public static long eventExpireTime; + /** + * Is mediator span filter enabled in otlp tracing. + */ + private static boolean isMediatorSpanFilterEnabled; + + /** + * Mediator list to filter spans in otlp tracing. + */ + private static Set mediatorSpanFilterSet; + /** * Initialize statistics collection when ESB starts. */ @@ -129,6 +142,21 @@ public static void init() { } OpenTelemetryManagerHolder.loadTracerConfigurations(); OpenTelemetryManagerHolder.setCollectingFlags(isCollectingPayloads, isCollectingProperties, isCollectingVariables); + String mediatorSpanFilterListStr = SynapsePropertiesLoader.getPropertyValue( + TelemetryConstants.OLTP_FILTERED_MEDIATOR_NAMES, null); + isMediatorSpanFilterEnabled = mediatorSpanFilterListStr != null && + !mediatorSpanFilterListStr.isEmpty(); + if (isMediatorSpanFilterEnabled) { + mediatorSpanFilterSet = Stream.of(mediatorSpanFilterListStr.split(",")) + .map(String::trim) // remove leading/trailing spaces + .map(String::toLowerCase) // convert to lowercase + .collect(Collectors.toSet()); + if (log.isDebugEnabled()) { + log.debug( + "Mediator span filter is enabled. Mediators with the following names " + + "will be not traced: " + mediatorSpanFilterListStr); + } + } } } else { if (log.isDebugEnabled()) { @@ -150,6 +178,20 @@ protected static void setStatisticsTraceId(MessageContext msgCtx) { } } + /** + * Set message Id of the message context as statistic trace Id at the beginning of the statistic flow. + * + * @param msgCtx Axis2 message context. + */ + protected static void setStatisticsTraceId(org.apache.axis2.context.MessageContext msgCtx) { + if (msgCtx.getProperty(StatisticsConstants.FLOW_STATISTICS_ID) == null && msgCtx.getMessageID() != null) { + msgCtx.setProperty(StatisticsConstants.FLOW_STATISTICS_ID, + msgCtx.getMessageID().replace(':', '_')); + } else if (msgCtx.getMessageID() == null) { + log.error("Message ID is null"); + } + } + /** * Returns true if statistics is collected in this message flow path. * @@ -241,6 +283,19 @@ public static void setCollectingAllStatistics(boolean state) { log.info("Collecting statistics for all artifacts state changed to: " + state); } + /** + * Check whether the given mediator is filtered out from tracing when using OpenTelemetry. + * + * @param componentName Name of the mediator + * @return true if the mediator is filtered out + */ + public static boolean isSpanFilteredMediator(String componentName) { + if (isMediatorSpanFilterEnabled && componentName != null) { + return mediatorSpanFilterSet.contains(componentName.split(":")[0].trim().toLowerCase()); + } + return false; + } + /** * Helper method to add event and increment stat count so that it denotes, open event is added. * diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/BasicStatisticDataUnit.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/BasicStatisticDataUnit.java index f217d4256d..b4bdc50f89 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/BasicStatisticDataUnit.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/BasicStatisticDataUnit.java @@ -18,6 +18,7 @@ package org.apache.synapse.aspects.flow.statistics.data.raw; +import java.util.Map; import org.apache.synapse.MessageContext; import org.apache.synapse.aspects.flow.statistics.elasticsearch.ElasticMetadata; import org.apache.synapse.core.SynapseEnvironment; @@ -67,7 +68,12 @@ public class BasicStatisticDataUnit { */ private MessageContext messageContext; - public String getStatisticId() { + /** + * Custom properties to be added to the statistic data unit. + */ + private Map customProperties; + + public String getStatisticId() { return statisticId; } @@ -130,4 +136,12 @@ public void setMessageContext(MessageContext messageContext) { public MessageContext getMessageContext() { return messageContext; } + + public Map getCustomProperties() { + return customProperties; + } + + public void setCustomProperties(Map customProperties) { + this.customProperties = customProperties; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticDataUnit.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticDataUnit.java index 1c2163d70e..4724d3ee7d 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticDataUnit.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticDataUnit.java @@ -79,7 +79,12 @@ public class StatisticDataUnit extends BasicStatisticDataUnit { */ private ComponentType componentType; - /** + /** + * Component Type String of the reporting component. + */ + private String componentTypeString; + + /** * Unique Id of the reporting component. */ private String componentId; @@ -126,7 +131,19 @@ public class StatisticDataUnit extends BasicStatisticDataUnit { */ private String statusCode; - /** + /** + * Error code if the statistic event is reported due to an error + */ + private String errorCode; + + /** + * Error message if the statistic event is reported due to an error + */ + private String errorMessage; + + private boolean isOuterLayerSpan = false; + + /** * Status description of the response. Optional */ private String statusDescription; @@ -299,4 +316,36 @@ public String getPropertyValue() { public void setPropertyValue(String propertyValue) { this.propertyValue = propertyValue; } + + public String getComponentTypeString() { + return componentTypeString; + } + + public void setComponentTypeString(String componentTypeString) { + this.componentTypeString = componentTypeString; + } + + public String getErrorCode() { + return errorCode; + } + + public void setErrorCode(String errorCode) { + this.errorCode = errorCode; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public boolean isOuterLayerSpan() { + return isOuterLayerSpan; + } + + public void setOuterLayerSpan(boolean isOuterLayerSpan) { + this.isOuterLayerSpan = isOuterLayerSpan; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticsLog.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticsLog.java index ca45844b52..3678fb813d 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticsLog.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/data/raw/StatisticsLog.java @@ -170,7 +170,9 @@ public class StatisticsLog { */ private String statusDescription; - /** + private Map customProperties; + + /** * Elastic analytics metadata holder. */ private ElasticMetadata elasticMetadata; @@ -201,6 +203,7 @@ public StatisticsLog(StatisticDataUnit statisticDataUnit) { this.transportHeaderMap = statisticDataUnit.getTransportHeaderMap(); this.statusCode = statisticDataUnit.getStatusCode(); this.statusDescription = statisticDataUnit.getStatusDescription(); + this.customProperties = statisticDataUnit.getCustomProperties(); } public StatisticsLog(ComponentType componentType, String componentName, int parentIndex) { @@ -428,4 +431,8 @@ public ElasticMetadata getElasticMetadata() { public void setElasticMetadata(ElasticMetadata elasticMetadata) { this.elasticMetadata = elasticMetadata; } + + public Map getCustomProperties() { + return customProperties; + } } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java index de8f09ea12..13b95306a4 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java @@ -82,6 +82,13 @@ public class TelemetryConstants { public static final String ENDPOINT_ATTRIBUTE_KEY = "Endpoint"; public static final String CORRELATION_ID_ATTRIBUTE_KEY = "CorrelationId"; + public static final String ERROR_CODE_ATTRIBUTE_KEY = "error.code"; + public static final String ERROR_MESSAGE_ATTRIBUTE_KEY = "error.message"; + public static final String OTEL_RESOURCE_ATTRIBUTE_KEY = "opentelemetry.properties.resource_attributes"; public static final String OTEL_RESOURCE_ATTRIBUTES_ENVIRONMENT_VARIABLE_NAME = "OTEL_RESOURCE_ATTRIBUTES"; + + public static final String OLTP_CUSTOM_SPAN_TAGS = "oltp.custom.span.header.tags"; + + public static final String OLTP_FILTERED_MEDIATOR_NAMES = "oltp.filtered.mediator.names"; } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java index 6d2dd7598c..756005a257 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/CloseEventHandler.java @@ -32,6 +32,15 @@ public interface CloseEventHandler { */ void handleCloseEntryEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + /** + * Handles a close entry event. + * + * @param basicStatisticDataUnit Basic statistic data unit object. + * @param msgCtx Axis2 Message context. + */ + void handleCloseEntryEvent(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx); + /** * Handles a forceful close event. * @param basicStatisticDataUnit Basic statistic data unit object. @@ -39,6 +48,15 @@ public interface CloseEventHandler { */ void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + /** + * Handles a forceful close event. + * + * @param basicStatisticDataUnit Basic statistic data unit object. + * @param msgCtx Axis2 Message context. + */ + void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx); + /** * Handles a try end flow event. * @param basicStatisticDataUnit Basic statistic data unit object. @@ -59,4 +77,13 @@ public interface CloseEventHandler { * @param synCtx Message context. */ void handleCloseEntryWithErrorEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx); + + /** + * Handles a close entry with error event. + * + * @param basicStatisticDataUnit Basic statistic data unit object. + * @param msgCtx Message context. + */ + void handleCloseEntryWithErrorEvent(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/OpenEventHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/OpenEventHandler.java index 0176edc057..bc1898dce8 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/OpenEventHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/event/OpenEventHandler.java @@ -34,6 +34,14 @@ public interface OpenEventHandler { */ void handleOpenEntryEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx); + /** + * Handles an open entry event. + * + * @param statisticDataUnit Statistic data unit object. + * @param msgCtx Axis2 Message context. + */ + void handleOpenEntryEvent(StatisticDataUnit statisticDataUnit, org.apache.axis2.context.MessageContext msgCtx); + /** * Handles an open child entry event. * @param statisticDataUnit Statistic data unit object. diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java index ddf070633c..4d7e899cbb 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/handling/span/SpanHandler.java @@ -25,6 +25,11 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapSetter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.ContinuationState; @@ -34,6 +39,7 @@ import org.apache.synapse.aspects.ComponentType; import org.apache.synapse.aspects.flow.statistics.data.raw.BasicStatisticDataUnit; import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit; +import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.TelemetryConstants; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.TelemetryTracer; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers.TracingUtils; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.parentresolving.ParentResolver; @@ -42,15 +48,10 @@ import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.stores.SpanStore; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.SpanWrapper; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.ContinuationStateSequenceInfo; +import org.apache.synapse.config.SynapsePropertiesLoader; import org.apache.synapse.continuation.SeqContinuationState; import org.apache.synapse.core.axis2.Axis2MessageContext; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - /** * Controls Jaeger spans, with respect to various events received during Synapse message flow. */ @@ -132,6 +133,15 @@ public void handleOpenEntryEvent(StatisticDataUnit statisticDataUnit, MessageCon startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx); } + @Override + public void handleOpenEntryEvent(StatisticDataUnit statisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(msgCtx); + synchronized (tracingScope.getSpanStore()) { + startSpan(statisticDataUnit, msgCtx, tracingScope.getSpanStore()); + } + } + @Override public void handleOpenChildEntryEvent(StatisticDataUnit statisticDataUnit, MessageContext synCtx) { startSpanOrBufferSequenceContinuationState(statisticDataUnit, synCtx); @@ -239,6 +249,34 @@ private void startSpan(StatisticDataUnit statisticDataUnit, MessageContext synCt if (headersMap != null) { headersMap.putAll(tracerSpecificCarrier); statisticDataUnit.setTransportHeaderMap(headersMap); + ((Axis2MessageContext) synCtx).getAxis2MessageContext() + .setProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS, headersMap); + + // Set custom span header tags + Object prevCustomSpanTagsObj = synCtx.getProperty(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS); + String customTagsString = + SynapsePropertiesLoader.getPropertyValue(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS, null); + boolean isCustomSpanTagsEnabled = customTagsString != null && !customTagsString.isEmpty(); + if (isCustomSpanTagsEnabled) { + if (prevCustomSpanTagsObj != null) { + Map customTagsMap = (Map) prevCustomSpanTagsObj; + if (!customTagsMap.isEmpty()) { + statisticDataUnit.setCustomProperties(customTagsMap); + } + } else { + String[] customTags = customTagsString.split(","); + Map customTagsMap = new HashMap<>(); + for (String tag : customTags) { + if (headersMap.containsKey(tag.trim())) { + customTagsMap.put(tag, headersMap.get(tag)); + } + } + if (!customTagsMap.isEmpty()) { + synCtx.setProperty(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS, customTagsMap); + statisticDataUnit.setCustomProperties(customTagsMap); + } + } + } } if (statusCode != null) { @@ -260,6 +298,96 @@ private void startSpan(StatisticDataUnit statisticDataUnit, MessageContext synCt } } + /** + * Starts a span, and stores necessary information in the span store to retrieve them back when needed. + * + * @param statisticDataUnit Statistic data unit object, which was collected during a statistic event. + * @param msgCtx Axis2 Message context. + * @param spanStore Span store object. + */ + private void startSpan(StatisticDataUnit statisticDataUnit, org.apache.axis2.context.MessageContext msgCtx, + SpanStore spanStore) { + String parentId = String.valueOf(statisticDataUnit.getParentIndex()); + SpanWrapper parentSpanWrapper = spanStore.getSpanWrapper(parentId); + Span parentSpan = null; + Context context = null; + if (parentSpanWrapper != null) { + parentSpan = parentSpanWrapper.getSpan(); + } + Span span; + Map tracerSpecificCarrier = new HashMap<>(); + + Map headersMap; + Object headers = msgCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); + if (headers instanceof Map) { + headersMap = new ConcurrentHashMap<>((Map) headers); + } else { + // We only need to extract span context from headers when there are trp headers available + headersMap = new ConcurrentHashMap(); + } + if (isOuterLevelSpan(statisticDataUnit, spanStore)) { + context = extract(headersMap); + } else if (parentSpan != null) { + context = Context.current().with(parentSpan); + } else { + context = Context.current(); + } + span = tracer.spanBuilder(statisticDataUnit.getComponentName()).setParent(context).startSpan(); + + // Set tracing headers + inject(span, tracerSpecificCarrier); + msgCtx.setProperty(SynapseConstants.JAEGER_TRACE_ID, span.getSpanContext().getTraceId()); + msgCtx.setProperty(SynapseConstants.JAEGER_SPAN_ID, span.getSpanContext().getSpanId()); + if (logger.isDebugEnabled()) { + logger.debug( + "Jaeger Trace ID: " + msgCtx.getProperty(SynapseConstants.JAEGER_TRACE_ID) + " Jaeger Span ID: " + + msgCtx.getProperty(SynapseConstants.JAEGER_SPAN_ID)); + } + + // Set text map key value pairs as HTTP headers + // Fix possible null pointer issue which can occur when following property is used + // + if (headersMap != null) { + headersMap.putAll(tracerSpecificCarrier); + statisticDataUnit.setTransportHeaderMap(headersMap); + msgCtx.setProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS, headersMap); + + Object prevCustomSpanTagsObj = msgCtx.getProperty(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS); + String customTagsString = + SynapsePropertiesLoader.getPropertyValue(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS, null); + boolean isCustomSpanTagsEnabled = customTagsString != null && !customTagsString.isEmpty(); + if (isCustomSpanTagsEnabled) { + if (prevCustomSpanTagsObj != null) { + Map customTagsMap = (Map) prevCustomSpanTagsObj; + if (!customTagsMap.isEmpty()) { + statisticDataUnit.setCustomProperties(customTagsMap); + } + } else { + String[] customTags = customTagsString.split(","); + Map customTagsMap = new HashMap<>(); + for (String tag : customTags) { + if (headersMap.containsKey(tag.trim())) { + customTagsMap.put(tag, headersMap.get(tag)); + } + } + if (!customTagsMap.isEmpty()) { + msgCtx.setProperty(TelemetryConstants.OLTP_CUSTOM_SPAN_TAGS, customTagsMap); + statisticDataUnit.setCustomProperties(customTagsMap); + } + } + } + } + + statisticDataUnit.setTransportHeaderMap(headersMap); + + String spanId = TracingUtils.extractId(statisticDataUnit); + SpanWrapper spanWrapper = spanStore.addSpanWrapper(spanId, span, statisticDataUnit, parentSpanWrapper, msgCtx); + + if (isOuterLevelSpan(statisticDataUnit, spanStore)) { + spanStore.assignOuterLevelSpan(spanWrapper); + } + } + /** * Buffers the given statistic data unit which is reported by an open event, * until an appropriate continuation stack event is reported. @@ -285,11 +413,12 @@ private void bufferSequenceContinuationState(StatisticDataUnit statisticDataUnit private boolean isOuterLevelSpan(StatisticDataUnit statisticDataUnit, SpanStore spanStore) { return spanStore.getOuterLevelSpanWrapper() == null && (statisticDataUnit.getComponentType() == ComponentType.TASK || - statisticDataUnit.getComponentType() == ComponentType.PROXYSERVICE + statisticDataUnit.getComponentType() == ComponentType.PROXYSERVICE || statisticDataUnit.getComponentType() == ComponentType.API || statisticDataUnit.getComponentType() == ComponentType.INBOUNDENDPOINT - || (statisticDataUnit.getComponentType() == ComponentType.SEQUENCE - && SynapseConstants.MAIN_SEQUENCE_KEY.equals(statisticDataUnit.getComponentName()))); + || statisticDataUnit.isOuterLayerSpan() + || (statisticDataUnit.getComponentType() == ComponentType.SEQUENCE + && SynapseConstants.MAIN_SEQUENCE_KEY.equals(statisticDataUnit.getComponentName()))); } @Override @@ -307,11 +436,24 @@ public void handleCloseEntryEvent(BasicStatisticDataUnit basicStatisticDataUnit, handleCloseEvent(basicStatisticDataUnit, synCtx, false); } + @Override + public void handleCloseEntryEvent(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx) { + handleCloseEvent(basicStatisticDataUnit, msgCtx, false); + } + + @Override public void handleCloseEntryWithErrorEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) { handleCloseEvent(basicStatisticDataUnit, synCtx, true); } + @Override + public void handleCloseEntryWithErrorEvent(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx) { + handleCloseEvent(basicStatisticDataUnit, msgCtx, true); + } + @Override public void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx) { TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx); @@ -329,6 +471,24 @@ public void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataU } } + @Override + public void handleCloseFlowForcefully(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(msgCtx); + SpanStore spanStore = tracingScope.getSpanStore(); + String spanWrapperId = TracingUtils.extractId(basicStatisticDataUnit); + SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId); + + // finish the current span + handleCloseEvent(basicStatisticDataUnit, msgCtx, true); + + // finish outer level spans since the control is not returned to the original flow to close them gracefully. + while (spanWrapper.getParentSpanWrapper() != null) { + spanWrapper = spanWrapper.getParentSpanWrapper(); + spanStore.finishSpan(spanWrapper, msgCtx, true); + } + } + private void handleCloseEvent(BasicStatisticDataUnit basicStatisticDataUnit, MessageContext synCtx, boolean isError) { TracingScope tracingScope = tracingScopeManager.getTracingScope(synCtx); synchronized (tracingScope.getSpanStore()) { @@ -339,6 +499,16 @@ private void handleCloseEvent(BasicStatisticDataUnit basicStatisticDataUnit, Mes } } + private void handleCloseEvent(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx, boolean isError) { + TracingScope tracingScope = tracingScopeManager.getTracingScope(msgCtx); + synchronized (tracingScope.getSpanStore()) { + if (!isBufferedForContinuationState(basicStatisticDataUnit, tracingScope.getSpanStore())) { + finishSpan(basicStatisticDataUnit, msgCtx, tracingScope.getSpanStore(), tracingScope, isError); + } + } + } + /** * Returns whether the given basic statistic data unit has been buffered to consider the continuation state. * In such cases, This will be used to absorb and skip the close event. @@ -395,6 +565,25 @@ private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit, } } + private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit, + org.apache.axis2.context.MessageContext msgCtx, + SpanStore spanStore, + TracingScope tracingScope, boolean isError) { + String spanWrapperId = TracingUtils.extractId(basicStatisticDataUnit); + SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId); + //Set the statistic data unit of the close event into the span wrapper + if (spanWrapper != null && (basicStatisticDataUnit instanceof StatisticDataUnit)) { + spanWrapper.setCloseEventStatisticDataUnit((StatisticDataUnit) basicStatisticDataUnit); + } + if (!Objects.equals(spanWrapper, spanStore.getOuterLevelSpanWrapper())) { + spanStore.finishSpan(spanWrapper, msgCtx, isError); + } else { + spanStore.finishSpan(spanWrapper, msgCtx, isError); + tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId()); + + } + } + /** * Cleans up remaining unfinished continuation state sequences before ending the outer level span. * @param spanStore Span store object. diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/helpers/SpanTagger.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/helpers/SpanTagger.java index 64f60de801..9b9b0daa20 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/helpers/SpanTagger.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/helpers/SpanTagger.java @@ -137,5 +137,106 @@ public static void setSpanTags(SpanWrapper spanWrapper, MessageContext synCtx) { span.setAttribute(TelemetryConstants.CORRELATION_ID_ATTRIBUTE_KEY, synCtx.getProperty(CorrelationConstants.CORRELATION_ID).toString()); } + + if (openStatisticsLog.getCustomProperties() != null) { + openStatisticsLog.getCustomProperties().forEach( + (key, value) -> span.setAttribute(key, String.valueOf(value)) + ); + } + if (spanWrapper.getCloseEventStatisticDataUnit() != null) { + if (spanWrapper.getCloseEventStatisticDataUnit().getCustomProperties() != null) { + spanWrapper.getCloseEventStatisticDataUnit().getCustomProperties().forEach( + (key, value) -> span.setAttribute(key, String.valueOf(value)) + ); + } + } + } + + /** + * Sets tags to the span which is contained in the provided span wrapper, from information acquired from the + * given basic statistic data unit. + * + * @param spanWrapper Span wrapper that contains the target span. + * @param msgCtx Axis2 message context + */ + public static void setSpanTags(SpanWrapper spanWrapper, org.apache.axis2.context.MessageContext msgCtx) { + StatisticDataUnit openEventStatisticDataUnit = spanWrapper.getStatisticDataUnit(); + Span span = spanWrapper.getSpan(); + StatisticDataUnit closeEventStatisticDataUnit = spanWrapper.getCloseEventStatisticDataUnit(); + if (OpenTelemetryManagerHolder.isCollectingPayloads()) { + if (openEventStatisticDataUnit.getPayload() != null) { + span.setAttribute(TelemetryConstants.BEFORE_PAYLOAD_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getPayload()); + } + if (closeEventStatisticDataUnit != null) { + if (closeEventStatisticDataUnit.getPayload() != null) { + span.setAttribute(TelemetryConstants.AFTER_PAYLOAD_ATTRIBUTE_KEY, + closeEventStatisticDataUnit.getPayload()); + } + } + } + + if (openEventStatisticDataUnit.getComponentName() != null) { + span.setAttribute(TelemetryConstants.COMPONENT_NAME_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getComponentName()); + } + + if (openEventStatisticDataUnit.getComponentType() != null) { + span.setAttribute(TelemetryConstants.COMPONENT_TYPE_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getComponentType().toString()); + } else if (openEventStatisticDataUnit.getComponentTypeString() != null) { + span.setAttribute(TelemetryConstants.COMPONENT_TYPE_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getComponentTypeString()); + } + + span.setAttribute(TelemetryConstants.THREAD_ID_ATTRIBUTE_KEY, Thread.currentThread().getId()); + if (openEventStatisticDataUnit.getComponentId() != null) { + span.setAttribute(TelemetryConstants.COMPONENT_ID_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getComponentId()); + } + if (openEventStatisticDataUnit.getHashCode() != null) { + span.setAttribute(TelemetryConstants.HASHCODE_ATTRIBUTE_KEY, openEventStatisticDataUnit.getHashCode()); + } + if (openEventStatisticDataUnit.getTransportHeaderMap() != null) { + span.setAttribute(TelemetryConstants.TRANSPORT_HEADERS_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getTransportHeaderMap().toString()); + } + + if (openEventStatisticDataUnit.getStatusCode() != null) { + span.setAttribute(TelemetryConstants.STATUS_CODE_ATTRIBUTE_KEY, openEventStatisticDataUnit.getStatusCode()); + } + if (openEventStatisticDataUnit.getStatusDescription() != null) { + span.setAttribute(TelemetryConstants.STATUS_DESCRIPTION_ATTRIBUTE_KEY, + openEventStatisticDataUnit.getStatusDescription()); + } + if (msgCtx.getProperty(CorrelationConstants.CORRELATION_ID) != null) { + span.setAttribute(TelemetryConstants.CORRELATION_ID_ATTRIBUTE_KEY, + msgCtx.getProperty(CorrelationConstants.CORRELATION_ID).toString()); + } + + if (openEventStatisticDataUnit.getCustomProperties() != null) { + openEventStatisticDataUnit.getCustomProperties().forEach( + (key, value) -> span.setAttribute(key, String.valueOf(value)) + ); + } + + if (closeEventStatisticDataUnit != null) { + if (closeEventStatisticDataUnit.getErrorCode() != null) { + span.setAttribute(TelemetryConstants.ERROR_CODE_ATTRIBUTE_KEY, + closeEventStatisticDataUnit.getErrorCode()); + } + + if (closeEventStatisticDataUnit.getErrorMessage() != null) { + span.setAttribute(TelemetryConstants.ERROR_MESSAGE_ATTRIBUTE_KEY, + closeEventStatisticDataUnit.getErrorMessage()); + } + + if (closeEventStatisticDataUnit.getCustomProperties() != null) { + closeEventStatisticDataUnit.getCustomProperties().forEach( + (key, value) -> span.setAttribute(key, String.valueOf(value)) + ); + } + } + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/scoping/TracingScopeManager.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/scoping/TracingScopeManager.java index 51dd951b7c..2d5cbe665e 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/scoping/TracingScopeManager.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/scoping/TracingScopeManager.java @@ -59,6 +59,28 @@ public TracingScope getTracingScope(MessageContext synCtx) { } } + /** + * Returns a tracing scope object for the provided message context. + * Returns the reference to the existing object when the tracing scope is known (has been already created). + * Otherwise creates a new scope, stores it, and returns its reference. + * + * @param msgCtx Axis2 Message context. + * @return Tracing scope object. + */ + public TracingScope getTracingScope(org.apache.axis2.context.MessageContext msgCtx) { + synchronized (tracingScopes) { + String tracingScopeId = extractTracingScopeId(msgCtx); + if (tracingScopes.containsKey(tracingScopeId)) { + // Already existing scope. Return its reference + return tracingScopes.get(tracingScopeId); + } else { + TracingScope tracingScope = new TracingScope(tracingScopeId); + tracingScopes.put(tracingScopeId, tracingScope); + return tracingScope; + } + } + } + /** * Gets the tracing scope id for the provided message context. * @@ -69,6 +91,16 @@ private String extractTracingScopeId(MessageContext synCtx) { return (String) synCtx.getProperty(StatisticsConstants.FLOW_STATISTICS_ID); } + /** + * Gets the tracing scope id for the provided message context. + * + * @param msgCtx Message context. + * @return Tracing scope id. + */ + private String extractTracingScopeId(org.apache.axis2.context.MessageContext msgCtx) { + return (String) msgCtx.getProperty(StatisticsConstants.FLOW_STATISTICS_ID); + } + /** * Gets the latest tracing scope object. * diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java index 539690ae2d..99dc18b84c 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java @@ -113,6 +113,32 @@ public SpanWrapper addSpanWrapper(String spanId, return spanWrapper; } + /** + * Denotes the beginning of a span. Adds appropriate elements to necessary data structures. + * + * @param spanId Index of the span wrapper + * @param activeSpan Reference to the span object, that have been started + * @param statisticDataUnit The statistic data unit object + * @param parentSpanWrapper Parent span wrapper of the created span wrapper + * @param msgCtx Message Context that is reported during the open event + * @return Created span wrapper object + */ + public SpanWrapper addSpanWrapper(String spanId, + Span activeSpan, + StatisticDataUnit statisticDataUnit, + SpanWrapper parentSpanWrapper, + org.apache.axis2.context.MessageContext msgCtx) { + SpanWrapper spanWrapper = new SpanWrapper(spanId, activeSpan, statisticDataUnit, parentSpanWrapper); + spanWrappers.put(spanId, spanWrapper); + spanWrapper.addKnownSynCtxHashCodeToAllParents(TracingUtils.getSystemIdentityHashCode(msgCtx)); + if (parentSpanWrapper != null) { + parentSpanWrapper.addChildComponentUniqueId(statisticDataUnit.getComponentId()); + } + componentUniqueIdWiseSpanWrappers.put(statisticDataUnit.getComponentId(), spanWrapper); + activeSpanWrappers.add(spanWrapper); + return spanWrapper; + } + /** * Denotes the end of a span. * Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers. @@ -123,6 +149,16 @@ public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx) { finishSpan(spanWrapper, synCtx, false); } + /** + * Denotes the end of a span. + * Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers. + * @param spanWrapper Span wrapper object, which has been already created + * @param msgCtx Axis2 message context + */ + public void finishSpan(SpanWrapper spanWrapper, org.apache.axis2.context.MessageContext msgCtx) { + finishSpan(spanWrapper, msgCtx, false); + } + /** * Denotes the end of a span. * Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers. @@ -144,6 +180,27 @@ public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx, boolean i } } + /** + * Denotes the end of a span. + * Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers. + * + * @param spanWrapper Span wrapper object, which has been already created + * @param msgCtx Synapse message context + * @param isError finishing the span with an error + */ + public void finishSpan(SpanWrapper spanWrapper, org.apache.axis2.context.MessageContext msgCtx, boolean isError) { + if (spanWrapper != null && spanWrapper.getSpan() != null) { + if (spanWrapper.getStatisticDataUnit() != null) { + SpanTagger.setSpanTags(spanWrapper, msgCtx); + } + if (isError) { + spanWrapper.getSpan().setStatus(StatusCode.ERROR); + } + spanWrapper.getSpan().end(); + activeSpanWrappers.remove(spanWrapper); + } + } + public void assignOuterLevelSpan(SpanWrapper spanWrapper) { outerLevelSpan = spanWrapper; } diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/AbstractMediatorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/AbstractMediatorFactory.java index 6805f821c0..bc9e855ac7 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/AbstractMediatorFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/AbstractMediatorFactory.java @@ -183,6 +183,17 @@ protected void processAuditStatus(Mediator mediator, OMElement mediatorOmElement } } } + + OMAttribute traceFilter = mediatorOmElement.getAttribute( + new QName(XMLConfigConstants.NULL_NAMESPACE, XMLConfigConstants.TRACE_FILTER_ATTRIB_NAME)); + if (traceFilter != null) { + String traceFilterValue = traceFilter.getAttributeValue(); + if (traceFilterValue != null) { + if (traceFilterValue.equals(XMLConfigConstants.TRACE_FILTER_ENABLE)) { + configuration.enableTraceFilter(); + } + } + } } } diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java b/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java index 8a5ea82507..300687c4a7 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java @@ -145,4 +145,7 @@ public static enum VARIABLE_DATA_TYPES { * The server push processing sequence name. */ public static final String SERVER_PUSH_SEQUENCE = "serverPushSequence"; + + public static final String TRACE_FILTER_ATTRIB_NAME = "traceFilter"; + public static final String TRACE_FILTER_ENABLE = "enable"; } diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java index 1955628e5d..68801f5a75 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java @@ -358,7 +358,10 @@ public boolean injectMessage(final MessageContext synCtx) { log.debug("Using Main Sequence for injected message"); } - + Object skipMainSequenceObj = synCtx.getProperty(SynapseConstants.SKIP_MAIN_SEQUENCE); + if (skipMainSequenceObj != null && (Boolean) skipMainSequenceObj) { + return true; + } return synCtx.getMainSequence().mediate(synCtx); } }