diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 195ca295eed..c1d73c72851 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -926,7 +926,10 @@ public class ConfigurationKeys {
// Opentelemetry based metrics reporting
public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = "metrics.reporting.opentelemetry.";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_CLASSNAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "className";
+ public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME = "org.apache.gobblin.metrics.OpenTelemetryMetrics";
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
+ public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
public static final String METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.enabled";
@@ -935,14 +938,19 @@ public class ConfigurationKeys {
public static final String METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_CLASSNAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.className";
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
- public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON String with string keys and values
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
- public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "interval.millis";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.buckets";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.scale";
+ // A comma-separated list of dimensions to add to the OpenTelemetry metrics
+ public static final String METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "dimensions";
+ public static final String METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "group.name";
+ public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = "org.apache.gobblin.metrics";
/**
* Rest server configuration properties.
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
index 72241b5dcc3..c0efb59ff79 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java
@@ -32,6 +32,11 @@
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
@@ -52,8 +57,10 @@
@Slf4j
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
- private static OpenTelemetryMetrics GLOBAL_INSTANCE;
+ private static volatile OpenTelemetryMetrics GLOBAL_INSTANCE;
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 10000L;
+ private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = 256;
+ private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = 3;
private OpenTelemetryMetrics(State state) {
super(state);
@@ -94,7 +101,12 @@ protected MetricExporter initializeMetricExporter(State state) {
public static OpenTelemetryMetrics getInstance(State state) {
if (state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) && GLOBAL_INSTANCE == null) {
- GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+ synchronized (OpenTelemetryMetrics.class) {
+ if (GLOBAL_INSTANCE == null) {
+ log.info("Creating OpenTelemetryMetrics instance");
+ GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
+ }
+ }
}
return GLOBAL_INSTANCE;
}
@@ -115,6 +127,13 @@ protected void initialize(State state) {
}
metricsResource = Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
}
+
+ Aggregation histogramAggregation = Base2ExponentialHistogramAggregation.create(
+ state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS,
+ DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS),
+ state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE,
+ DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE));
+
SdkMeterProvider meterProvider = SdkMeterProvider.builder()
.setResource(metricsResource)
.registerMetricReader(
@@ -123,6 +142,9 @@ protected void initialize(State state) {
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
.build())
+ .registerView(
+ InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
+ View.builder().setAggregation(histogramAggregation).build())
.build();
this.openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
new file mode 100644
index 00000000000..6c731b4e5c1
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+@Getter
+@AllArgsConstructor
+public enum GobblinOpenTelemetryMetrics {
+ /**
+ * Metric to track the count of Gobblin Jobs for each of its state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
+ * Metric Unit: 1 represents each increment will add one data point to the counter.
+ * */
+ GOBBLIN_JOB_STATE("gobblin_job_state", "Gobblin job state counter", "1", OpenTelemetryMetricType.LONG_COUNTER),
+
+ /**
+ * Metric to track the latency of each Gobblin Job state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
+ * Metric Unit: seconds (s) represents the time taken for each state.
+ * */
+ GOBBLIN_JOB_STATE_LATENCY("gobblin_job_state_latency", "Gobblin job state latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
+
+ private final String metricName;
+ private final String metricDescription;
+ private final String metricUnit;
+ private final OpenTelemetryMetricType metricType;
+
+ @Override
+ public String toString() {
+ return String.format("Metric{name='%s', description='%s', unit='%s', type=%s}", metricName, metricDescription, metricUnit, metricType);
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
new file mode 100644
index 00000000000..d998380a54a
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+public class GobblinOpenTelemetryMetricsConstants {
+
+ public static class DimensionKeys {
+ public static final String STATE = "state";
+ public static final String CURR_STATE = "currState";
+ }
+
+ public static class DimensionValues {
+ public static final String GENERATE_WU = "generateWU";
+ public static final String PROCESS_WU = "processWU";
+ public static final String COMMIT_STEP = "commitStep";
+ public static final String JOB_START = "jobStart";
+ public static final String JOB_COMPLETE = "jobComplete";
+ public static final String GENERATE_WU_START = "generateWUStart";
+ public static final String GENERATE_WU_COMPLETE = "generateWUComplete";
+ public static final String PROCESS_WU_START = "processWUStart";
+ public static final String PROCESS_WU_COMPLETE = "processWUComplete";
+ public static final String COMMIT_STEP_START = "commitStepStart";
+ public static final String COMMIT_STEP_COMPLETE = "commitStepComplete";
+ }
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
new file mode 100644
index 00000000000..e5fb3b13ba1
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryDoubleHistogram.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry {@link DoubleHistogram}.
+ *
+ *
This class provides a histogram for recording the distribution of double values.
+ * It supports recording values with optional additional attributes that can be merged with base attributes.
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric {
+ private String name;
+ private Attributes baseAttributes;
+ private DoubleHistogram doubleHistogram;
+
+ /**
+ * Records the specified value in the histogram with the base attributes.
+ *
+ * @param value the double value to record in the histogram
+ */
+ public void record(double value) {
+ log.debug("Emitting double histogram metric: {}, value: {}, attributes: {}", this.name, value, this.baseAttributes);
+ this.doubleHistogram.record(value, this.baseAttributes);
+ }
+
+ /**
+ * Records the specified value in the histogram with a combination of base attributes and additional attributes.
+ *
+ * @param value the double value to record in the histogram
+ * @param additionalAttributes the additional attributes to be merged with base attributes
+ */
+ public void record(double value, Attributes additionalAttributes) {
+ log.debug("Emitting double histogram metric: {}, value: {}, base attributes: {}, additional attributes: {}",
+ this.name, value, this.baseAttributes, additionalAttributes);
+ this.doubleHistogram.record(value, OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getMetricName() {
+ return this.name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OpenTelemetryMetricType getMetricType() {
+ return OpenTelemetryMetricType.DOUBLE_HISTOGRAM;
+ }
+
+ /**
+ * Returns a string representation of this histogram with its name.
+ */
+ @Override
+ public String toString() {
+ return "OpenTelemetryDoubleHistogram{name='" + name + "'}";
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
new file mode 100644
index 00000000000..b3e3d30b855
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.Map;
+
+import lombok.experimental.UtilityClass;
+import org.apache.commons.lang3.StringUtils;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+
+/**
+ * Utility class for OpenTelemetry related operations.
+ *
+ * Provides methods to handle OpenTelemetry attributes, including merging multiple
+ * {@link Attributes} instances and converting maps to {@link Attributes}.
+ */
+@UtilityClass
+public class OpenTelemetryHelper {
+
+ private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "UNKNOWN";
+
+ /**
+ * Returns the provided attribute value when it is non-null and non-empty;
+ * otherwise returns the default OpenTelemetry attribute placeholder.
+ *
+ * @param value candidate attribute value to check
+ * @return the original value if not empty, or DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
+ */
+ public static String getOrDefaultOpenTelemetryAttrValue(String value) {
+ return StringUtils.defaultIfBlank(value, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
+ }
+
+ /**
+ * Merges multiple {@link Attributes} instances into a single {@link Attributes}.
+ *
+ *
Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are ignored.
+ * The resulting {@link Attributes} contains all key-value pairs from the
+ * provided non-null, non-empty inputs in the order they are given.
+ * For duplicate keys, the last occurrence in the array will take precedence.
+ *
+ * @param attributesArray array of {@link Attributes} to merge; may contain {@code null} or empty entries
+ * @return a new {@link Attributes} instance containing all entries from the non-null,
+ * non-empty inputs; never {@code null}
+ */
+ public static Attributes mergeAttributes(Attributes... attributesArray) {
+ AttributesBuilder builder = Attributes.builder();
+ for (Attributes attrs : attributesArray) {
+ if (attrs != null && !attrs.isEmpty()) {
+ builder.putAll(attrs);
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Converts a map of string attributes to an OpenTelemetry {@link Attributes} instance.
+ *
+ *
Each entry in the map is converted to an OpenTelemetry attribute, using
+ * {@link #getOrDefaultOpenTelemetryAttrValue(String)} to handle empty values.
+ *
+ * @param attributes map of string attributes to convert; may be {@code null}
+ * @return a new {@link Attributes} instance containing the converted attributes;
+ * never {@code null}
+ */
+ public static Attributes toOpenTelemetryAttributes(Map attributes) {
+ AttributesBuilder builder = Attributes.builder();
+ if (attributes != null) {
+ for (Map.Entry entry : attributes.entrySet()) {
+ String key = entry.getKey();
+ String value = getOrDefaultOpenTelemetryAttrValue(entry.getValue());
+ builder.put(key, value);
+ }
+ }
+ return builder.build();
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
new file mode 100644
index 00000000000..5561978fc90
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentation.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import com.google.common.base.Splitter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Provides OpenTelemetry instrumentation for metrics.
+ *
+ * Maintains a singleton instance that holds common attributes {@link Attributes} and a Meter {@link Meter}.
+ * Exposes methods to retrieve or create metric instruments defined in {@link GobblinOpenTelemetryMetrics}.
+ */
+@Slf4j
+@Getter
+public class OpenTelemetryInstrumentation {
+
+ // Adding the gobblin-service.main (BaseFlowGraphHelper.FLOW_EDGE_LABEL_JOINER_CHAR) dependency is creating circular dependency issues
+ private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
+ private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
+ private static volatile OpenTelemetryInstrumentation GLOBAL_INSTANCE;
+
+ private final Attributes commonAttributes;
+ private final Meter meter;
+ private final ConcurrentHashMap metrics;
+
+ private OpenTelemetryInstrumentation(final State state) {
+ this.commonAttributes = buildCommonAttributes(state);
+ this.meter = getOpenTelemetryMetrics(state).getMeter(state.getProp(
+ ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME));
+ this.metrics = new ConcurrentHashMap<>();
+ }
+
+ private OpenTelemetryMetricsBase getOpenTelemetryMetrics(State state) {
+ try {
+ String openTelemetryClassName = state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME);
+ Class> metricsClass = Class.forName(openTelemetryClassName);
+ Method getInstanceMethod = metricsClass.getMethod("getInstance", State.class);
+ return (OpenTelemetryMetricsBase) getInstanceMethod.invoke(null, state);
+ } catch (Exception e) {
+ log.error("Failed to initialize OpenTelemetryMetrics through reflection, defaulting to direct instantiation of InMemoryOpenTelemetryMetrics", e);
+ }
+ return InMemoryOpenTelemetryMetrics.getInstance(state);
+ }
+
+ /**
+ * Returns the singleton instance for the given configuration state.
+ *
+ * @param state the configuration containing metric reporting and dimension configs
+ * @return the global {@link OpenTelemetryInstrumentation} instance
+ */
+ public static OpenTelemetryInstrumentation getInstance(final State state) {
+ if (GLOBAL_INSTANCE == null) {
+ synchronized (OpenTelemetryInstrumentation.class) {
+ if (GLOBAL_INSTANCE == null) {
+ log.info("Creating OpenTelemetryInstrumentation instance");
+ GLOBAL_INSTANCE = new OpenTelemetryInstrumentation(state);
+ }
+ }
+ }
+ return GLOBAL_INSTANCE;
+ }
+
+ public static OpenTelemetryInstrumentation getInstance(final Properties props) {
+ return getInstance(new State(props));
+ }
+
+ /**
+ * Retrieves an existing metric by its enum definition or creates it if absent.
+ *
+ * @param metric the {@link GobblinOpenTelemetryMetrics} enum defining name, description, unit, and type {@link OpenTelemetryMetricType}
+ * @return an {@link OpenTelemetryMetric} instance corresponding to the provided enum
+ */
+ public OpenTelemetryMetric getOrCreate(GobblinOpenTelemetryMetrics metric) {
+ return this.metrics.computeIfAbsent(metric.getMetricName(), name -> createMetric(metric));
+ }
+
+ private OpenTelemetryMetric createMetric(GobblinOpenTelemetryMetrics metric) {
+ String name = metric.getMetricName();
+ String description = metric.getMetricDescription();
+ String unit = metric.getMetricUnit();
+ Attributes attrs = this.commonAttributes;
+
+ switch (metric.getMetricType()) {
+ case LONG_COUNTER:
+ return new OpenTelemetryLongCounter(
+ name,
+ attrs,
+ this.meter.counterBuilder(name)
+ .setDescription(description)
+ .setUnit(unit)
+ .build()
+ );
+ case DOUBLE_HISTOGRAM:
+ return new OpenTelemetryDoubleHistogram(
+ name,
+ attrs,
+ this.meter.histogramBuilder(name)
+ .setDescription(metric.getMetricDescription())
+ .setUnit(metric.getMetricUnit())
+ .build()
+ );
+ default:
+ throw new IllegalArgumentException("Unsupported metric type: " + metric.getMetricType());
+ }
+ }
+
+ private Attributes buildCommonAttributes(final State state) {
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ String commonDimensions = state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+ if (StringUtils.isNotEmpty(commonDimensions)) {
+ for (String dimension : COMMA_SPLITTER.split(commonDimensions)) {
+ String dimensionKey = dimension.trim();
+ String dimensionValue = state.getProp(dimensionKey, "");
+ if (ConfigurationKeys.FLOW_EDGE_ID_KEY.equals(dimensionKey)) {
+ dimensionValue = getFlowEdgeId(state, dimensionValue);
+ }
+ if (StringUtils.isNotEmpty(dimensionValue)) {
+ attributesBuilder.put(dimensionKey, OpenTelemetryHelper.getOrDefaultOpenTelemetryAttrValue(dimensionValue));
+ }
+ }
+ }
+ return attributesBuilder.build();
+ }
+
+ private static String getFlowEdgeId(final State state, String fullFlowEdgeId) {
+ // Parse the flowEdgeId from fullFlowEdgeId that is stored in format sourceNode_destinationNode_flowEdgeId
+ return StringUtils.substringAfter(
+ StringUtils.substringAfter(fullFlowEdgeId, state.getProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "")),
+ FLOW_EDGE_LABEL_JOINER_CHAR);
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
new file mode 100644
index 00000000000..4d06a6af901
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryLongCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+
+
+/**
+ * Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry {@link LongCounter}.
+ *
+ * This class provides a counter for recording values.
+ * It supports adding values with optional additional attributes that can be merged with base attributes.
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+public class OpenTelemetryLongCounter implements OpenTelemetryMetric {
+ private String name;
+ private Attributes baseAttributes;
+ private LongCounter longCounter;
+
+ /**
+ * Adds the specified value to the counter with the base attributes.
+ *
+ * @param value the value to add to the counter
+ */
+ public void add(long value) {
+ log.debug("Emitting long counter metric: {}, value: {}, attributes: {}", this.name, value, this.baseAttributes);
+ this.longCounter.add(value, this.baseAttributes);
+ }
+
+ /**
+ * Adds the specified value to the counter with a combination of base attributes and additional attributes.
+ *
+ * @param value the value to add to the counter
+ * @param additionalAttributes the additional attributes to be merged with base attributes
+ */
+ public void add(long value, Attributes additionalAttributes) {
+ log.debug("Emitting long counter metric: {}, value: {}, base attributes: {}, additional attributes: {}",
+ this.name, value, this.baseAttributes, additionalAttributes);
+ this.longCounter.add(value, OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getMetricName() {
+ return this.name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public OpenTelemetryMetricType getMetricType() {
+ return OpenTelemetryMetricType.LONG_COUNTER;
+ }
+
+ /**
+ * Returns a string representation of this counter with its name.
+ */
+ @Override
+ public String toString() {
+ return "OpenTelemetryLongCounter{name='" + name + "'}";
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
new file mode 100644
index 00000000000..48f27f17a89
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetric.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+/**
+ * Interface representing a metric in the OpenTelemetry system.
+ * It provides methods to retrieve the metric name and type.
+ */
+public interface OpenTelemetryMetric {
+
+ /**
+ * Returns the name of the metric.
+ *
+ * @return the metric name as a {@link String}
+ */
+ String getMetricName();
+
+ /**
+ * Returns the {@link OpenTelemetryMetricType} indicating the kind of metric instrument.
+ *
+ * @return the metric type {@link OpenTelemetryMetricType}
+ */
+ OpenTelemetryMetricType getMetricType();
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
new file mode 100644
index 00000000000..3b2f9ba65c7
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+
+/**
+ * Enum representing the types of OpenTelemetry metrics supported.
+ */
+public enum OpenTelemetryMetricType {
+ /** Represents a metric of type LongCounter. */
+ LONG_COUNTER,
+ /** Represents a metric of type DoubleHistogram. */
+ DOUBLE_HISTOGRAM;
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
new file mode 100644
index 00000000000..b1fef640194
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryHelperTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+
+/**
+ * Tests for {@link OpenTelemetryHelper} class.
+ */
+public class OpenTelemetryHelperTest {
+
+ @Test
+ public void testMergeAttributes() {
+ Attributes emptyAttributes = Attributes.empty();
+ Attributes attributes1 = Attributes.builder().put("key1", "value1").build();
+ Attributes attributes2 = Attributes.builder().put("key2", "value2").build();
+
+ Attributes attributes = OpenTelemetryHelper.mergeAttributes(null, emptyAttributes);
+ Assert.assertEquals(attributes.size(), 0);
+ Assert.assertEquals(attributes, emptyAttributes);
+
+ attributes = OpenTelemetryHelper.mergeAttributes(attributes1, attributes2);
+ Assert.assertEquals(attributes.size(), 2);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), "value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")), "value2");
+
+ attributes = OpenTelemetryHelper.mergeAttributes(attributes1, emptyAttributes);
+ Assert.assertEquals(attributes.size(), 1);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), "value1");
+ Assert.assertNull(attributes.get(AttributeKey.stringKey("key2")));
+
+ }
+
+ @Test
+ public void testToOpenTelemetryAttributes() {
+ Attributes attributes = OpenTelemetryHelper.toOpenTelemetryAttributes(null);
+ Assert.assertEquals(attributes.size(), 0);
+
+ Map map = new HashMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+
+ attributes = OpenTelemetryHelper.toOpenTelemetryAttributes(map);
+ Assert.assertEquals(attributes.size(), 2);
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key1")), "value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("key2")), "value2");
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
new file mode 100644
index 00000000000..74c7f57fbfa
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryInstrumentationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.lang.reflect.Field;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Unit tests for {@link OpenTelemetryInstrumentation}.
+ * These tests ensure that the singleton instance is created correctly,
+ * common attributes are built from dimensions, and metrics are created and cached properly.
+ */
+public class OpenTelemetryInstrumentationTest {
+
+ private OpenTelemetryInstrumentation instrumentation;
+ private State state;
+
+ @BeforeMethod
+ public void setUp() throws NoSuchFieldException, IllegalAccessException {
+ // Reset singleton instance before each test
+ Field instanceField = OpenTelemetryInstrumentation.class.getDeclaredField("GLOBAL_INSTANCE");
+ instanceField.setAccessible(true);
+ instanceField.set(null, null);
+
+ state = new State();
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CLASSNAME, "org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics");
+ }
+
+ @Test
+ public void singletonInstanceCreatedOnce() {
+ OpenTelemetryInstrumentation instance1 = OpenTelemetryInstrumentation.getInstance(state);
+ OpenTelemetryInstrumentation instance2 = OpenTelemetryInstrumentation.getInstance(state);
+
+ Assert.assertSame(instance1, instance2, "getInstance should return the same instance");
+ }
+
+ @Test
+ public void emptyDimensionsCreateEmptyAttributes() {
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+ Assert.assertEquals(attributes.size(), 0);
+ }
+
+ @Test
+ public void commonAttributesBuiltFromDimensions() {
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS, "dim1,dim2");
+ state.setProp("dim1", "value1");
+ state.setProp("dim2", "value2");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim1")), "value1");
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey("dim2")), "value2");
+ }
+
+ @Test
+ public void flowEdgeIdParsedCorrectly() {
+ state.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS,
+ ConfigurationKeys.FLOW_EDGE_ID_KEY);
+ state.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "destNode");
+ state.setProp(ConfigurationKeys.FLOW_EDGE_ID_KEY, "sourceNode_destNode_edgeId");
+
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Attributes attributes = instrumentation.getCommonAttributes();
+
+ Assert.assertEquals(attributes.get(AttributeKey.stringKey(ConfigurationKeys.FLOW_EDGE_ID_KEY)),
+ "edgeId");
+ }
+
+ @Test
+ public void metricsAreCreatedAndCached() {
+ instrumentation = OpenTelemetryInstrumentation.getInstance(state);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 0, "Metrics map should be empty initially");
+ instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map should contain one metric after creation");
+ instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 1, "Metrics map should still contain one metric after duplicate creation");
+ instrumentation.getOrCreate(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY);
+ Assert.assertEquals(instrumentation.getMetrics().size(), 2, "Metrics map should contain two metrics after creating another");
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
new file mode 100644
index 00000000000..5930c3438ed
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/opentelemetry/OpenTelemetryMetricTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.opentelemetry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.InMemoryOpenTelemetryMetrics;
+
+/** Tests for OpenTelemetry metrics implementation, specifically for
+ * {@link OpenTelemetryLongCounter} and {@link OpenTelemetryDoubleHistogram}.
+ * These tests validate the correct recording of metrics with and without additional attributes.
+ */
+public class OpenTelemetryMetricTest {
+
+ private InMemoryOpenTelemetryMetrics inMemoryOpenTelemetryMetrics;
+ private final String testMeterGroupName = "testMeterGroup";
+ private final Attributes baseAttributes = Attributes.builder()
+ .put("dim1", "val1")
+ .put("dim2", "val2")
+ .build();
+
+ @BeforeMethod
+ public void setUp() {
+ inMemoryOpenTelemetryMetrics = InMemoryOpenTelemetryMetrics.getInstance(new State());
+ }
+
+ @Test
+ public void testOpenTelemetryLongCounter() {
+ String metricName = "testLongCounter";
+ OpenTelemetryLongCounter longCounter = new OpenTelemetryLongCounter(metricName, baseAttributes,
+ inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName).counterBuilder(metricName).build());
+ longCounter.add(20, Attributes.builder().put("dim3", "val3").build());
+ Collection metrics = inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map metricsByName = metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List dataPoints = new ArrayList<>(metricData.getLongSumData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getValue(), 20);
+ Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")), "val1");
+ Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")), "val2");
+ Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")), "val3");
+ }
+
+ @Test
+ public void testOpenTelemetryLongCounterWithoutAdditionalAttributes() {
+ String metricName = "testLongCounterWithoutAdditionalAttributes";
+ OpenTelemetryLongCounter longCounter = new OpenTelemetryLongCounter(metricName, baseAttributes,
+ inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName).counterBuilder(metricName).build());
+ longCounter.add(10);
+ Collection metrics = inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map metricsByName = metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List dataPoints = new ArrayList<>(metricData.getLongSumData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getValue(), 10);
+ Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim1")), "val1");
+ Assert.assertEquals(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim2")), "val2");
+ Assert.assertNull(dataPoints.get(0).getAttributes().get(AttributeKey.stringKey("dim3")),
+ "Additional attribute dim3 should not be present when not provided");
+ }
+
+ @Test
+ public void testOpenTelemetryDoubleHistogram() {
+ String metricName = "testDoubleHistogram";
+ OpenTelemetryDoubleHistogram doubleHistogram = new OpenTelemetryDoubleHistogram(metricName, baseAttributes,
+ inMemoryOpenTelemetryMetrics.getMeter(testMeterGroupName).histogramBuilder(metricName).build());
+ doubleHistogram.record(5.0, Attributes.builder().put("dim3", "val3").build());
+ doubleHistogram.record(10.0, Attributes.builder().put("dim3", "val3").build());
+ Collection metrics = inMemoryOpenTelemetryMetrics.metricReader.collectAllMetrics();
+ Assert.assertEquals(metrics.size(), 1);
+ Map metricsByName = metrics.stream().collect(Collectors.toMap(MetricData::getName, metricData -> metricData));
+ MetricData metricData = metricsByName.get(metricName);
+ List dataPoints = new ArrayList<>(metricData.getHistogramData().getPoints());
+ Assert.assertEquals(dataPoints.size(), 1);
+ Assert.assertEquals(dataPoints.get(0).getSum(), 15.0, "Sum should match recorded value");
+ }
+
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index cdd5728fdc8..2b7b9afa19b 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -120,6 +120,8 @@ public interface GobblinTemporalConfigurationKeys {
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "emit.otel.metrics." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options.";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
index 22126b2d0fc..0860fa82b07 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -52,6 +52,9 @@ public enum ActivityType {
/** Activity type for submitting GTE. */
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+ /** Activity type for emitting open telemetry metrics */
+ EMIT_OTEL_METRICS(GobblinTemporalConfigurationKeys.TEMPORAL_EMIT_OTEL_METRICS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
/** Default placeholder activity type. */
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
new file mode 100644
index 00000000000..580fb0cdaad
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/EmitOTelMetrics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.Map;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+
+@ActivityInterface
+public interface EmitOTelMetrics {
+
+ @ActivityMethod
+ void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value, Map attributes, Properties jobProps);
+
+ @ActivityMethod
+ void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double value, Map attributes, Properties jobProps);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
new file mode 100644
index 00000000000..e2d7cca5b83
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/EmitOTelMetricsImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryLongCounter;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
+
+
+public class EmitOTelMetricsImpl implements EmitOTelMetrics {
+
+ @Override
+ public void emitLongCounterMetric(GobblinOpenTelemetryMetrics metric, long value, Map attributes, Properties jobProps) {
+ OpenTelemetryLongCounter longCounter = (OpenTelemetryLongCounter) OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+ longCounter.add(value, OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+ }
+
+ @Override
+ public void emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics metric, double value, Map attributes, Properties jobProps) {
+ OpenTelemetryDoubleHistogram doubleHistogram = (OpenTelemetryDoubleHistogram) OpenTelemetryInstrumentation.getInstance(jobProps).getOrCreate(metric);
+ doubleHistogram.record(value, OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index b2f65ccb410..ad61e7b99d4 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -17,7 +17,9 @@
package org.apache.gobblin.temporal.ddm.launcher;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,12 +35,14 @@
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -50,6 +54,9 @@
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
/**
* A {@link JobLauncher} for the initial triggering of a Temporal workflow that executes a full Gobblin job workflow of:
@@ -96,8 +103,21 @@ public void submitJob(List workunits) {
EventSubmitterContext eventSubmitterContext = new EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(finalProps)
.build();
+
+ Map attributes = new HashMap<>();
+ attributes.put(CURR_STATE, JOB_START);
+ EmitOTelMetricsImpl emitOTelMetrics = new EmitOTelMetricsImpl();
+ emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, finalProps);
+
+ long startTimeMillis = System.currentTimeMillis();
ExecGobblinStats execGobblinStats = workflow.execute(finalProps, eventSubmitterContext);
+ double timeTaken = (System.currentTimeMillis() - startTimeMillis) / 1000.0;
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", execGobblinStats);
+ attributes.put(CURR_STATE, JOB_COMPLETE);
+ emitOTelMetrics.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, finalProps);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, JOB_COMPLETE);
+ emitOTelMetrics.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, timeTaken, attributes, finalProps);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 12fe6c4d849..3ee45d20eb3 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -28,6 +28,7 @@
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
+import org.apache.gobblin.temporal.ddm.activity.impl.EmitOTelMetricsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
@@ -60,7 +61,7 @@ protected Class>[] getWorkflowImplClasses() {
@Override
protected Object[] getActivityImplInstances() {
return new Object[] { new SubmitGTEActivityImpl(), new GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), new ProcessWorkUnitImpl(),
- new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
+ new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(), new EmitOTelMetricsImpl()};
}
@Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f1b4444d3b5..244def5df1d 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -21,8 +21,10 @@
import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -46,9 +48,11 @@
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
@@ -75,6 +79,9 @@
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
@Slf4j
public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
@@ -83,11 +90,16 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
// Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties
- final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
- com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+ final Properties temporalJobProps = PropertiesUtils.combineProperties(
+ PropertiesUtils.extractPropertiesWithPrefix(jobProps, com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)),
+ PropertiesUtils.extractPropertiesWithPrefix(jobProps, com.google.common.base.Optional.of(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_PREFIX))
+ );
// Add File system properties to the temporal job properties
temporalJobProps.putAll(PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)));
+ Map attributes = new HashMap<>();
+ final EmitOTelMetrics emitOTelMetricsActivityStub = Workflow.newActivityStub(EmitOTelMetrics.class,
+ ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(temporalJobProps, false));
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, temporalJobProps);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
@@ -97,7 +109,16 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
try (Closer closer = Closer.create()) {
final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class,
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps, true));
+ attributes.put(CURR_STATE, GENERATE_WU_START);
+ emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1, attributes, temporalJobProps);
+ long genWUStartTime = Workflow.currentTimeMillis();
GenerateWorkUnitsResult generateWorkUnitResult = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ attributes.put(CURR_STATE, GENERATE_WU_COMPLETE);
+ emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1, attributes, temporalJobProps);
+ double genWUTImeTaken = (Workflow.currentTimeMillis() - genWUStartTime) / 1000.0;
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, GENERATE_WU);
+ emitOTelMetricsActivityStub.emitDoubleHistogramMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, genWUTImeTaken, attributes, temporalJobProps);
optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResult.getWorkUnitsSizeSummary();
int numWUsGenerated = safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 60aceee0e65..dad17e81b16 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -29,8 +30,11 @@
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+import org.apache.gobblin.temporal.ddm.activity.EmitOTelMetrics;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
@@ -49,15 +53,22 @@
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.util.PropertiesUtils;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionKeys.*;
+import static org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants.DimensionValues.*;
+
@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";
+ private EmitOTelMetrics emitOTelMetricsActivityStub;
+
@Override
public CommitStats process(WUProcessingSpec workSpec, final Properties props) {
Optional timer = this.createOptJobEventTimer(workSpec, props);
+ this.emitOTelMetricsActivityStub = Workflow.newActivityStub(EmitOTelMetrics.class,
+ ActivityType.EMIT_OTEL_METRICS.buildActivityOptions(props, false));
CommitStats result = performWork(workSpec, props);
timer.ifPresent(EventTimer::stop);
return result;
@@ -74,7 +85,18 @@ private CommitStats performWork(WUProcessingSpec workSpec, final Properties prop
performWorkloadInput = new NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(),
Optional.empty(), props);
+ Map attributes = new HashMap<>();
+ attributes.put(CURR_STATE, PROCESS_WU_START);
+ this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, props);
+ long processWUStartTime = Workflow.currentTimeMillis();
workunitsProcessed = Optional.of(processingWorkflow.performWorkload(performWorkloadInput));
+ attributes.put(CURR_STATE, PROCESS_WU_COMPLETE);
+ this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, props);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, PROCESS_WU);
+ double processWUDuration = (Workflow.currentTimeMillis() - processWUStartTime) / 1000.0;
+ this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+ GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, processWUDuration, attributes, props);
} catch (Exception e) {
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e);
@@ -106,7 +128,18 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp
return CommitStats.createEmpty();
}
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes);
+ Map attributes = new HashMap<>();
+ attributes.put(CURR_STATE, COMMIT_STEP_START);
+ this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, props);
+ long commitStepStartTime = Workflow.currentTimeMillis();
CommitStats result = commitWorkflow.commit(workSpec, props);
+ attributes.put(CURR_STATE, COMMIT_STEP_COMPLETE);
+ this.emitOTelMetricsActivityStub.emitLongCounterMetric(GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE, 1L, attributes, props);
+ attributes.remove(CURR_STATE);
+ attributes.put(STATE, COMMIT_STEP);
+ double commitStepDuration = (Workflow.currentTimeMillis() - commitStepStartTime) / 1000.0;
+ this.emitOTelMetricsActivityStub.emitDoubleHistogramMetric(
+ GobblinOpenTelemetryMetrics.GOBBLIN_JOB_STATE_LATENCY, commitStepDuration, attributes, props);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been committed at the task level.");
}
diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
index 8ce5c2991dd..6e2a54fb3d7 100644
--- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
+++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -70,6 +70,7 @@ public Object[][] activityTypesWithStartToCloseTimeout() {
{ActivityType.PROCESS_WORKUNIT, 555},
{ActivityType.COMMIT, 444},
{ActivityType.SUBMIT_GTE, 999},
+ {ActivityType.EMIT_OTEL_METRICS, 888},
{ActivityType.DEFAULT_ACTIVITY, 1}
};
}
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index 2263dcdfa01..f0207d9ba41 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
.register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus"))
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce dependencies version"))
- .register(new BuildProperty("openTelemetryVersion", "1.30.0", "OpenTelemetry dependencies version"))
+ .register(new BuildProperty("openTelemetryVersion", "1.47.0", "OpenTelemetry dependencies version"))
.register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer dependencies version"))
task buildProperties(description: 'Lists main properties that can be used to customize the build') {
doLast {