diff --git a/distribution/pom.xml b/distribution/pom.xml
index 1ec50b532ce2..59b643f7cf07 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -626,7 +626,7 @@
-c
io.confluent.druid.extensions:confluent-extensions
-c
- org.apache.druid.extensions.contrib:opentelemetry-emitter
+ org.apache.druid.extensions.contrib:druid-opentelemetry-extensions
diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/HybridProtobufReader.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/HybridProtobufReader.java
index 83c5c20299aa..81a19e287a1d 100644
--- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/HybridProtobufReader.java
+++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/HybridProtobufReader.java
@@ -26,7 +26,7 @@
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
+import org.apache.druid.data.input.opentelemetry.protobuf.metrics.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java
index c19005e6cb8f..ad4f6a12df55 100644
--- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java
+++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java
@@ -19,8 +19,6 @@
package org.apache.druid.data.input.opencensus.protobuf;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -30,34 +28,27 @@
import io.opencensus.proto.metrics.v1.Metric;
import io.opencensus.proto.metrics.v1.Point;
import io.opencensus.proto.metrics.v1.TimeSeries;
-import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.opentelemetry.protobuf.AbstractProtobufReader;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
-import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public class OpenCensusProtobufReader implements InputEntityReader
+public class OpenCensusProtobufReader extends AbstractProtobufReader
{
private static final String SEPARATOR = "-";
private static final String VALUE_COLUMN = "value";
- private final DimensionsSpec dimensionsSpec;
- private final SettableByteEntity extends ByteEntity> source;
private final String metricDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;
@@ -70,8 +61,7 @@ public OpenCensusProtobufReader(
String resourceLabelPrefix
)
{
- this.dimensionsSpec = dimensionsSpec;
- this.source = source;
+ super(dimensionsSpec, source);
this.metricDimension = metricDimension;
this.metricLabelPrefix = metricLabelPrefix;
this.resourceLabelPrefix = resourceLabelPrefix;
@@ -83,36 +73,9 @@ private interface LabelContext
}
@Override
- public CloseableIterator read()
+ public List parseData(ByteBuffer byteBuffer) throws InvalidProtocolBufferException
{
- Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator());
- return CloseableIterators.withEmptyBaggage(new Iterator() {
- @Override
- public boolean hasNext()
- {
- return supplier.get().hasNext();
- }
- @Override
- public InputRow next()
- {
- return supplier.get().next();
- }
- });
- }
-
- List readAsList()
- {
- try {
- ByteBuffer buffer = source.getEntity().getBuffer();
- List rows = parseMetric(Metric.parseFrom(buffer));
- // Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf
- // parser does not update the position itself
- buffer.position(buffer.limit());
- return rows;
- }
- catch (InvalidProtocolBufferException e) {
- throw new ParseException(null, e, "Protobuf message could not be parsed");
- }
+ return parseMetric(Metric.parseFrom(byteBuffer));
}
private List parseMetric(final Metric metric)
@@ -222,10 +185,4 @@ private void addPointRows(Point point, Metric metric, LabelContext labelContext)
default:
}
}
-
- @Override
- public CloseableIterator sample()
- {
- return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
- }
}
diff --git a/extensions-contrib/opentelemetry-extensions/pom.xml b/extensions-contrib/opentelemetry-extensions/pom.xml
index 75d67f621bfb..6185ff17b6db 100644
--- a/extensions-contrib/opentelemetry-extensions/pom.xml
+++ b/extensions-contrib/opentelemetry-extensions/pom.xml
@@ -67,6 +67,11 @@
jackson-databind
provided
+
+ commons-codec
+ commons-codec
+ provided
+
org.apache.druid
druid-core
@@ -85,6 +90,11 @@
junit
test
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ test
+
org.openjdk.jmh
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/AbstractProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/AbstractProtobufReader.java
new file mode 100644
index 000000000000..4b0ef07c9bf0
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/AbstractProtobufReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+public abstract class AbstractProtobufReader implements InputEntityReader
+{
+
+ protected final SettableByteEntity extends ByteEntity> source;
+ protected final DimensionsSpec dimensionsSpec;
+
+ public AbstractProtobufReader(DimensionsSpec dimensionsSpec,
+ SettableByteEntity extends ByteEntity> source)
+ {
+ this.dimensionsSpec = dimensionsSpec;
+ this.source = source;
+ }
+
+ public abstract List parseData(ByteBuffer byteBuffer) throws InvalidProtocolBufferException;
+
+ @Override
+ public CloseableIterator read()
+ {
+ Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator());
+ return CloseableIterators.withEmptyBaggage(new Iterator() {
+ @Override
+ public boolean hasNext()
+ {
+ return supplier.get().hasNext();
+ }
+ @Override
+ public InputRow next()
+ {
+ return supplier.get().next();
+ }
+ });
+ }
+
+ public List readAsList()
+ {
+ try {
+ ByteBuffer buffer = source.getEntity().getBuffer();
+ List rows = parseData(buffer);
+ // Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf
+ // parser does not update the position itself
+ buffer.position(buffer.limit());
+ return rows;
+ }
+ catch (InvalidProtocolBufferException e) {
+ throw new ParseException(null, e, "Protobuf message could not be parsed");
+ }
+ }
+
+ protected InputRow createRow(long timeUnixMilli, Map event)
+ {
+ final List dimensions;
+ if (!dimensionsSpec.getDimensionNames().isEmpty()) {
+ dimensions = dimensionsSpec.getDimensionNames();
+ } else {
+ dimensions = new ArrayList<>(Sets.difference(event.keySet(), dimensionsSpec.getDimensionExclusions()));
+ }
+ return new MapBasedInputRow(timeUnixMilli, dimensions, event);
+ }
+
+ @Override
+ public CloseableIterator sample()
+ {
+ return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java
index 4c027c31248c..c15fef1bf494 100644
--- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java
@@ -23,6 +23,8 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
+import org.apache.druid.data.input.opentelemetry.protobuf.metrics.OpenTelemetryMetricsProtobufInputFormat;
+import org.apache.druid.data.input.opentelemetry.protobuf.traces.OpenTelemetryTracesProtobufInputFormat;
import org.apache.druid.initialization.DruidModule;
import java.util.Collections;
@@ -30,14 +32,14 @@
public class OpenTelemetryProtobufExtensionsModule implements DruidModule
{
-
@Override
public List extends Module> getJacksonModules()
{
return Collections.singletonList(
new SimpleModule("OpenTelemetryProtobufInputFormat")
.registerSubtypes(
- new NamedType(OpenTelemetryMetricsProtobufInputFormat.class, "opentelemetry-metrics-protobuf")
+ new NamedType(OpenTelemetryMetricsProtobufInputFormat.class, "opentelemetry-metrics-protobuf"),
+ new NamedType(OpenTelemetryTracesProtobufInputFormat.class, "opentelemetry-traces-protobuf")
)
);
}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/Utils.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/Utils.java
new file mode 100644
index 000000000000..8a96475833c4
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/Utils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Utils
+{
+ public static SettableByteEntity extends ByteEntity> getSettableEntity(InputEntity source)
+ {
+ // Sampler passes a KafkaRecordEntity directly, while the normal code path wraps the same entity in a
+ // SettableByteEntity
+ if (source instanceof SettableByteEntity) {
+ return (SettableByteEntity extends ByteEntity>) source;
+ } else {
+ SettableByteEntity wrapper = new SettableByteEntity<>();
+ wrapper.setEntity((ByteEntity) source);
+ return wrapper;
+ }
+ }
+
+ @Nullable
+ public static Object parseAnyValue(AnyValue value)
+ {
+ switch (value.getValueCase()) {
+ case INT_VALUE:
+ return value.getIntValue();
+ case BOOL_VALUE:
+ return value.getBoolValue();
+ case DOUBLE_VALUE:
+ return value.getDoubleValue();
+ case STRING_VALUE:
+ return value.getStringValue();
+
+ // TODO: Support KVLIST_VALUE, ARRAY_VALUE and BYTES_VALUE
+
+ default:
+ // VALUE_NOT_SET
+ return null;
+ }
+ }
+
+ public static Map getResourceAttributes(Resource resource, String resourceAttributePrefix)
+ {
+ return resource
+ .getAttributesList()
+ .stream()
+ .collect(
+ HashMap::new,
+ (m, kv) -> {
+ Object value = Utils.parseAnyValue(kv.getValue());
+ if (value != null) {
+ m.put(resourceAttributePrefix + kv.getKey(), value);
+ }
+ },
+ HashMap::putAll
+ );
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufInputFormat.java
similarity index 81%
rename from extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java
rename to extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufInputFormat.java
index 50029e8dfbd9..024b6538a26b 100644
--- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufInputFormat.java
@@ -17,15 +17,14 @@
* under the License.
*/
-package org.apache.druid.data.input.opentelemetry.protobuf;
+package org.apache.druid.data.input.opentelemetry.protobuf.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.impl.ByteEntity;
-import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.data.input.opentelemetry.protobuf.Utils;
import org.apache.druid.java.util.common.StringUtils;
import java.io.File;
@@ -64,23 +63,13 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
- // Sampler passes a KafkaRecordEntity directly, while the normal code path wraps the same entity in a
- // SettableByteEntity
- SettableByteEntity extends ByteEntity> settableEntity;
- if (source instanceof SettableByteEntity) {
- settableEntity = (SettableByteEntity extends ByteEntity>) source;
- } else {
- SettableByteEntity wrapper = new SettableByteEntity<>();
- wrapper.setEntity((ByteEntity) source);
- settableEntity = wrapper;
- }
return new OpenTelemetryMetricsProtobufReader(
- inputRowSchema.getDimensionsSpec(),
- settableEntity,
- metricDimension,
- valueDimension,
- metricAttributePrefix,
- resourceAttributePrefix
+ inputRowSchema.getDimensionsSpec(),
+ Utils.getSettableEntity(source),
+ metricDimension,
+ valueDimension,
+ metricAttributePrefix,
+ resourceAttributePrefix
);
}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReader.java
similarity index 57%
rename from extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java
rename to extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReader.java
index c5fa65ca28e2..e569000b00d7 100644
--- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReader.java
@@ -17,50 +17,36 @@
* under the License.
*/
-package org.apache.druid.data.input.opentelemetry.protobuf;
+package org.apache.druid.data.input.opentelemetry.protobuf.metrics;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
-import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowListPlusRawValues;
-import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.opentelemetry.protobuf.AbstractProtobufReader;
+import org.apache.druid.data.input.opentelemetry.protobuf.Utils;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
-import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-public class OpenTelemetryMetricsProtobufReader implements InputEntityReader
+public class OpenTelemetryMetricsProtobufReader extends AbstractProtobufReader
{
private static final Logger log = new Logger(OpenTelemetryMetricsProtobufReader.class);
-
- private final SettableByteEntity extends ByteEntity> source;
private final String metricDimension;
private final String valueDimension;
private final String metricAttributePrefix;
private final String resourceAttributePrefix;
- private final DimensionsSpec dimensionsSpec;
public OpenTelemetryMetricsProtobufReader(
DimensionsSpec dimensionsSpec,
@@ -71,63 +57,20 @@ public OpenTelemetryMetricsProtobufReader(
String resourceAttributePrefix
)
{
- this.dimensionsSpec = dimensionsSpec;
- this.source = source;
+ super(dimensionsSpec, source);
this.metricDimension = metricDimension;
this.valueDimension = valueDimension;
this.metricAttributePrefix = metricAttributePrefix;
this.resourceAttributePrefix = resourceAttributePrefix;
}
- @Override
- public CloseableIterator read()
- {
- Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator());
- return CloseableIterators.withEmptyBaggage(new Iterator() {
- @Override
- public boolean hasNext()
- {
- return supplier.get().hasNext();
- }
- @Override
- public InputRow next()
- {
- return supplier.get().next();
- }
- });
- }
-
- List readAsList()
- {
- try {
- ByteBuffer buffer = source.getEntity().getBuffer();
- List rows = parseMetricsData(MetricsData.parseFrom(buffer));
- // Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf
- // parser does not update the position itself
- buffer.position(buffer.limit());
- return rows;
- }
- catch (InvalidProtocolBufferException e) {
- throw new ParseException(null, e, "Protobuf message could not be parsed");
- }
- }
-
private List parseMetricsData(final MetricsData metricsData)
{
return metricsData.getResourceMetricsList()
.stream()
.flatMap(resourceMetrics -> {
- Map resourceAttributes = resourceMetrics.getResource()
- .getAttributesList()
- .stream()
- .collect(HashMap::new,
- (m, kv) -> {
- Object value = parseAnyValue(kv.getValue());
- if (value != null) {
- m.put(resourceAttributePrefix + kv.getKey(), value);
- }
- },
- HashMap::putAll);
+ Map resourceAttributes = Utils.getResourceAttributes(resourceMetrics.getResource(),
+ resourceAttributePrefix);
return resourceMetrics.getScopeMetricsList()
.stream()
.flatMap(scopeMetrics -> scopeMetrics.getMetricsList()
@@ -186,7 +129,7 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,
event.putAll(resourceAttributes);
dataPoint.getAttributesList().forEach(att -> {
- Object value = parseAnyValue(att.getValue());
+ Object value = Utils.parseAnyValue(att.getValue());
if (value != null) {
event.put(metricAttributePrefix + att.getKey(), value);
}
@@ -195,41 +138,10 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,
return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event);
}
- @Nullable
- private static Object parseAnyValue(AnyValue value)
- {
- switch (value.getValueCase()) {
- case INT_VALUE:
- return value.getIntValue();
- case BOOL_VALUE:
- return value.getBoolValue();
- case DOUBLE_VALUE:
- return value.getDoubleValue();
- case STRING_VALUE:
- return value.getStringValue();
-
- // TODO: Support KVLIST_VALUE, ARRAY_VALUE and BYTES_VALUE
-
- default:
- // VALUE_NOT_SET
- return null;
- }
- }
-
- InputRow createRow(long timeUnixMilli, Map event)
- {
- final List dimensions;
- if (!dimensionsSpec.getDimensionNames().isEmpty()) {
- dimensions = dimensionsSpec.getDimensionNames();
- } else {
- dimensions = new ArrayList<>(Sets.difference(event.keySet(), dimensionsSpec.getDimensionExclusions()));
- }
- return new MapBasedInputRow(timeUnixMilli, dimensions, event);
- }
-
@Override
- public CloseableIterator sample()
+ public List parseData(ByteBuffer byteBuffer)
+ throws InvalidProtocolBufferException
{
- return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
+ return parseMetricsData(MetricsData.parseFrom(byteBuffer));
}
}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormat.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormat.java
new file mode 100644
index 000000000000..bc9bb948de21
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormat.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf.traces;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.opentelemetry.protobuf.Utils;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.io.File;
+import java.util.Objects;
+
+public class OpenTelemetryTracesProtobufInputFormat implements InputFormat
+{
+ static String DEFAULT_RESOURCE_ATTR_PREFIX = "resource.";
+ static String DEFAULT_SPAN_NAME_DIMENSION = "span_name";
+ static String DEFAULT_SPAN_ID_DIMENSION = "span_id";
+ static String DEFAULT_PARENT_SPAN_ID_DIMENSION = "parent_span_id";
+ static String DEFAULT_TRACE_ID_DIMENSION = "trace_id";
+ static String DEFAULT_END_TIME_DIMENSION = "end_time";
+ static String DEFAULT_STATUS_CODE_DIMENSION = "status.code";
+ static String DEFAULT_STATUS_MESSAGE_DIMENSION = "status.message";
+ static String DEFAULT_KIND_DIMENSION = "kind";
+
+ private final String spanAttributePrefix;
+ private final String resourceAttributePrefix;
+ private final String spanNameDimension;
+ private final String spanIdDimension;
+ private final String parentSpanIdDimension;
+ private final String traceIdDimension;
+ private final String endTimeDimension;
+ private final String statusCodeDimension;
+ private final String statusMessageDimension;
+ private final String kindDimension;
+
+ @JsonProperty
+ public String getSpanNameDimension()
+ {
+ return spanNameDimension;
+ }
+
+ @JsonProperty
+ public String getResourceAttributePrefix()
+ {
+ return resourceAttributePrefix;
+ }
+
+ @JsonProperty
+ public String getSpanIdDimension()
+ {
+ return spanIdDimension;
+ }
+
+ @JsonProperty
+ public String getParentSpanIdDimension()
+ {
+ return parentSpanIdDimension;
+ }
+
+ @JsonProperty
+ public String getTraceIdDimension()
+ {
+ return traceIdDimension;
+ }
+
+ @JsonProperty
+ public String getEndTimeDimension()
+ {
+ return endTimeDimension;
+ }
+
+ @JsonProperty
+ public String getStatusCodeDimension()
+ {
+ return statusCodeDimension;
+ }
+
+ @JsonProperty
+ public String getStatusMessageDimension()
+ {
+ return statusMessageDimension;
+ }
+
+ @JsonProperty
+ public String getKindDimension()
+ {
+ return kindDimension;
+ }
+
+ @JsonProperty
+ public String getSpanAttributePrefix()
+ {
+ return spanAttributePrefix;
+ }
+
+ private String validateDimensionName(String input, String dimensionName)
+ {
+ Preconditions.checkArgument(!input.isEmpty(),
+ dimensionName + " dimension cannot be empty");
+ return input;
+ }
+
+ public OpenTelemetryTracesProtobufInputFormat(
+ @JsonProperty("spanAttributePrefix") String spanAttributePrefix,
+ @JsonProperty("resourceAttributePrefix") String resourceAttributePrefix,
+ @JsonProperty("spanNameDimension") String spanNameDimension,
+ @JsonProperty("spanIdDimension") String spanIdDimension,
+ @JsonProperty("parentSpanIdDimension") String parentSpanIdDimension,
+ @JsonProperty("traceIdDimension") String traceIdDimension,
+ @JsonProperty("endTimeDimension") String endTimeDimension,
+ @JsonProperty("statusCodeDimension") String statusCodeDimension,
+ @JsonProperty("statusMessageDimension") String statusMessageDimension,
+ @JsonProperty("kindDimension") String kindDimension
+ )
+ {
+
+ this.spanAttributePrefix = StringUtils.nullToEmptyNonDruidDataString(spanAttributePrefix);
+ this.resourceAttributePrefix = resourceAttributePrefix == null ? DEFAULT_RESOURCE_ATTR_PREFIX : resourceAttributePrefix;
+
+ this.spanNameDimension = spanNameDimension == null ? DEFAULT_SPAN_NAME_DIMENSION :
+ validateDimensionName(spanNameDimension, "spanNameDimension");
+ this.spanIdDimension = spanIdDimension == null ? DEFAULT_SPAN_ID_DIMENSION :
+ validateDimensionName(spanIdDimension, "spanIdDimension");
+ this.parentSpanIdDimension = parentSpanIdDimension == null ? DEFAULT_PARENT_SPAN_ID_DIMENSION :
+ validateDimensionName(parentSpanIdDimension, "parentSpanIdDimension");
+ this.traceIdDimension = traceIdDimension == null ? DEFAULT_TRACE_ID_DIMENSION :
+ validateDimensionName(traceIdDimension, "traceIdDimension");
+ this.endTimeDimension = endTimeDimension == null ? DEFAULT_END_TIME_DIMENSION :
+ validateDimensionName(endTimeDimension, "endTimeDimension");
+ this.statusCodeDimension = statusCodeDimension == null ? DEFAULT_STATUS_CODE_DIMENSION :
+ validateDimensionName(statusCodeDimension, "statusCodeDimension");
+ this.statusMessageDimension = statusMessageDimension == null ? DEFAULT_STATUS_MESSAGE_DIMENSION :
+ validateDimensionName(statusMessageDimension, "statusMessageDimension");
+ this.kindDimension = kindDimension == null ? DEFAULT_KIND_DIMENSION :
+ validateDimensionName(kindDimension, "kindDimension");
+
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+ {
+ return new OpenTelemetryTracesProtobufReader(
+ inputRowSchema.getDimensionsSpec(),
+ Utils.getSettableEntity(source),
+ spanAttributePrefix,
+ resourceAttributePrefix,
+ spanNameDimension,
+ spanIdDimension,
+ parentSpanIdDimension,
+ traceIdDimension,
+ endTimeDimension,
+ statusCodeDimension,
+ statusMessageDimension,
+ kindDimension
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OpenTelemetryTracesProtobufInputFormat that = (OpenTelemetryTracesProtobufInputFormat) o;
+ return Objects.equals(spanAttributePrefix, that.spanAttributePrefix)
+ && Objects.equals(resourceAttributePrefix, that.resourceAttributePrefix)
+ && Objects.equals(spanNameDimension, that.spanNameDimension)
+ && Objects.equals(spanIdDimension, that.spanIdDimension)
+ && Objects.equals(parentSpanIdDimension, that.parentSpanIdDimension)
+ && Objects.equals(traceIdDimension, that.traceIdDimension)
+ && Objects.equals(endTimeDimension, that.endTimeDimension)
+ && Objects.equals(statusCodeDimension, that.statusCodeDimension)
+ && Objects.equals(statusMessageDimension, that.statusMessageDimension)
+ && Objects.equals(kindDimension, that.kindDimension);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ spanAttributePrefix,
+ resourceAttributePrefix,
+ spanNameDimension,
+ spanIdDimension,
+ parentSpanIdDimension,
+ traceIdDimension,
+ endTimeDimension,
+ statusCodeDimension,
+ statusMessageDimension,
+ kindDimension
+ );
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReader.java
new file mode 100644
index 000000000000..339508de6f4a
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReader.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf.traces;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.opentelemetry.proto.trace.v1.Span;
+import io.opentelemetry.proto.trace.v1.TracesData;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.opentelemetry.protobuf.AbstractProtobufReader;
+import org.apache.druid.data.input.opentelemetry.protobuf.Utils;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class OpenTelemetryTracesProtobufReader extends AbstractProtobufReader
+{
+ private final String spanAttributePrefix;
+ private final String resourceAttributePrefix;
+
+ // Number of '*Dimension' variables
+ private static final int DEFAULT_COLUMN_COUNT = 8;
+
+ private final String spanNameDimension;
+ private final String spanIdDimension;
+ private final String parentSpanIdDimension;
+ private final String traceIdDimension;
+
+ private final String endTimeDimension;
+ private final String statusCodeDimension;
+ private final String statusMessageDimension;
+ private final String kindDimension;
+
+ public OpenTelemetryTracesProtobufReader(
+ DimensionsSpec dimensionsSpec,
+ SettableByteEntity extends ByteEntity> source,
+ String spanAttributePrefix,
+ String resourceAttributePrefix,
+ String spanNameDimension,
+ String spanIdDimension,
+ String parentSpanIdDimension,
+ String traceIdDimension,
+ String endTimeDimension,
+ String statusCodeDimension,
+ String statusMessageDimension,
+ String kindDimension
+ )
+ {
+ super(dimensionsSpec, source);
+ this.resourceAttributePrefix = resourceAttributePrefix;
+ this.spanAttributePrefix = spanAttributePrefix;
+ this.spanNameDimension = spanNameDimension;
+ this.spanIdDimension = spanIdDimension;
+ this.parentSpanIdDimension = parentSpanIdDimension;
+ this.traceIdDimension = traceIdDimension;
+ this.endTimeDimension = endTimeDimension;
+ this.statusCodeDimension = statusCodeDimension;
+ this.statusMessageDimension = statusMessageDimension;
+ this.kindDimension = kindDimension;
+ }
+
+ @Override
+ public List parseData(ByteBuffer byteBuffer)
+ throws InvalidProtocolBufferException
+ {
+ return parseTracesData(TracesData.parseFrom(byteBuffer));
+ }
+
+ private List parseTracesData(final TracesData tracesData)
+ {
+ return tracesData.getResourceSpansList()
+ .stream()
+ .flatMap(resourceSpans -> {
+ Map resourceAttributes = Utils.getResourceAttributes(resourceSpans.getResource(),
+ resourceAttributePrefix);
+ return resourceSpans.getScopeSpansList()
+ .stream()
+ .flatMap(scopeSpans -> scopeSpans.getSpansList()
+ .stream()
+ .map(span -> parseSpan(span, resourceAttributes)));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private InputRow parseSpan(Span span, Map resourceAttributes)
+ {
+ int capacity = resourceAttributes.size() + span.getAttributesCount() +
+ DEFAULT_COLUMN_COUNT;
+ Map event = Maps.newHashMapWithExpectedSize(capacity);
+ event.put(spanNameDimension, span.getName());
+ event.put(spanIdDimension, Hex.encodeHexString(span.getSpanId().asReadOnlyByteBuffer()));
+ event.put(parentSpanIdDimension, Hex.encodeHexString(span.getParentSpanId().asReadOnlyByteBuffer()));
+ event.put(traceIdDimension, Hex.encodeHexString(span.getTraceId().asReadOnlyByteBuffer()));
+ event.put(endTimeDimension, TimeUnit.NANOSECONDS.toMillis(span.getEndTimeUnixNano()));
+ event.put(statusCodeDimension, span.getStatus().getCodeValue());
+ event.put(statusMessageDimension, span.getStatus().getMessage());
+
+ String spanKind = span.getKind().toString();
+ // remove the "SPAN_KIND_" prefix
+ event.put(kindDimension, spanKind.substring(spanKind.lastIndexOf('_') + 1));
+ event.putAll(resourceAttributes);
+ span.getAttributesList().forEach(att -> {
+ Object value = Utils.parseAnyValue(att.getValue());
+ if (value != null) {
+ event.put(spanAttributePrefix + att.getKey(), value);
+ }
+ });
+
+ return createRow(TimeUnit.NANOSECONDS.toMillis(span.getStartTimeUnixNano()), event);
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/TestUtils.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/TestUtils.java
new file mode 100644
index 000000000000..16fd8b577f1f
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/TestUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf;
+
+import org.apache.druid.data.input.InputRow;
+import org.junit.Assert;
+
+import java.util.List;
+
+public class TestUtils
+{
+
+ public static void assertDimensionEquals(InputRow row, String dimension, Object expected)
+ {
+ List values = row.getDimension(dimension);
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals(expected, values.get(0));
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/UtilsTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/UtilsTest.java
new file mode 100644
index 000000000000..9a24aac295a2
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/UtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.ArrayValue;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.druid.data.input.opentelemetry.protobuf.Utils.getSettableEntity;
+import static org.apache.druid.data.input.opentelemetry.protobuf.Utils.parseAnyValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class UtilsTest
+{
+ @Test
+ public void testGetSettableByteEntity()
+ {
+ byte[] bytes = "bytes".getBytes(StandardCharsets.UTF_8);
+ InputEntity ie = new ByteEntity(bytes);
+ assertEquals(ie, getSettableEntity(ie).getEntity());
+
+ SettableByteEntity se = new SettableByteEntity<>();
+ se.setEntity(new ByteEntity(bytes));
+ assertEquals(se, getSettableEntity(se));
+ }
+
+ @Test
+ public void testParseAnyValue()
+ {
+ AnyValue.Builder anyValBuilder = AnyValue.newBuilder();
+ assertEquals(100L, parseAnyValue(anyValBuilder.setIntValue(100L).build()));
+ assertEquals(false, parseAnyValue(anyValBuilder.setBoolValue(false).build()));
+ assertEquals("String", parseAnyValue(anyValBuilder.setStringValue("String").build()));
+ assertEquals(100.0, parseAnyValue(anyValBuilder.setDoubleValue(100.0).build()));
+ assertNull(parseAnyValue(anyValBuilder.setArrayValue(ArrayValue.getDefaultInstance()).build()));
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsBenchmark.java
similarity index 97%
rename from extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java
rename to extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsBenchmark.java
index 0238aeccafa5..d766b8663e5a 100644
--- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsBenchmark.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.data.input.opentelemetry.protobuf;
+package org.apache.druid.data.input.opentelemetry.protobuf.metrics;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.proto.common.v1.AnyValue;
@@ -49,7 +49,7 @@
@Fork(1)
@State(Scope.Benchmark)
-public class OpenTelemetryBenchmark
+public class OpenTelemetryMetricsBenchmark
{
private static ByteBuffer BUFFER;
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsInputFormatTest.java
similarity index 93%
rename from extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java
rename to extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsInputFormatTest.java
index 536247ab5716..d621686e5b34 100644
--- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsInputFormatTest.java
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.druid.data.input.opentelemetry.protobuf;
+package org.apache.druid.data.input.opentelemetry.protobuf.metrics;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryProtobufExtensionsModule;
import org.junit.Assert;
import org.junit.Test;
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReaderTest.java
similarity index 97%
rename from extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java
rename to extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReaderTest.java
index 70c60bd00dd2..9a33ee76d943 100644
--- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/metrics/OpenTelemetryMetricsProtobufReaderTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.data.input.opentelemetry.protobuf;
+package org.apache.druid.data.input.opentelemetry.protobuf.metrics;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.proto.common.v1.AnyValue;
@@ -44,6 +44,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.apache.druid.data.input.opentelemetry.protobuf.TestUtils.assertDimensionEquals;
+
public class OpenTelemetryMetricsProtobufReaderTest
{
private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli());
@@ -356,7 +358,7 @@ public void testUnsupportedValueTypes()
}
@Test
- public void testInvalidProtobuf()
+ public void testInvalidProtobuf() throws IOException
{
byte[] invalidProtobuf = new byte[] {0x00, 0x01};
SettableByteEntity settableByteEntity = new SettableByteEntity<>();
@@ -372,9 +374,6 @@ public void testInvalidProtobuf()
Assert.assertThrows(ParseException.class, () -> rows.hasNext());
Assert.assertThrows(ParseException.class, () -> rows.next());
}
- catch (IOException e) {
- // Comes from the implicit call to close. Ignore
- }
}
@Test
@@ -403,12 +402,4 @@ public void testInvalidMetricType()
rows.forEachRemaining(rowList::add);
Assert.assertEquals(0, rowList.size());
}
-
- private void assertDimensionEquals(InputRow row, String dimension, Object expected)
- {
- List values = row.getDimension(dimension);
- Assert.assertEquals(1, values.size());
- Assert.assertEquals(expected, values.get(0));
- }
-
}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesBenchmark.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesBenchmark.java
new file mode 100644
index 000000000000..78d5c776edef
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf.traces;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+import io.opentelemetry.proto.trace.v1.ResourceSpans;
+import io.opentelemetry.proto.trace.v1.ScopeSpans;
+import io.opentelemetry.proto.trace.v1.Span;
+import io.opentelemetry.proto.trace.v1.Status;
+import io.opentelemetry.proto.trace.v1.TracesData;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+
+@Fork(1)
+@State(Scope.Benchmark)
+public class OpenTelemetryTracesBenchmark
+{
+ private static ByteBuffer BUFFER;
+
+ @Param(value = {"1", "2", "4", "8" })
+ private int resourceSpanCount = 1;
+
+ @Param(value = {"1"})
+ private int instrumentationScopeCount = 1;
+
+ @Param(value = {"1", "2", "4", "8" })
+ private int spansCount = 1;
+
+ private static final long START = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z")
+ .toEpochMilli());
+
+ private static final long END = START + 1_000_000L;
+
+ private static final OpenTelemetryTracesProtobufInputFormat INPUT_FORMAT =
+ new OpenTelemetryTracesProtobufInputFormat(
+ "test.span.",
+ "test.resource.",
+ "test.name",
+ "test.span_id",
+ "test.parent.span",
+ "test.trace.id",
+ "test.end.time",
+ "test.status.code",
+ "test.status.message",
+ "test.kind"
+ );
+
+ private static final InputRowSchema ROW_SCHEMA = new InputRowSchema(null,
+ new DimensionsSpec(ImmutableList.of(
+ new StringDimensionSchema("name"),
+ new StringDimensionSchema("span.id"),
+ new StringDimensionSchema("foo_key"))),
+ null);
+
+ private ByteBuffer createTracesBuffer()
+ {
+ TracesData.Builder tracesData = TracesData.newBuilder();
+ for (int i = 0; i < resourceSpanCount; i++) {
+ ResourceSpans.Builder resourceSpansBuilder = tracesData.addResourceSpansBuilder();
+ Resource.Builder resourceBuilder = resourceSpansBuilder.getResourceBuilder();
+
+ for (int resourceAttributeI = 0; resourceAttributeI < 5; resourceAttributeI++) {
+ KeyValue.Builder resourceAttributeBuilder = resourceBuilder.addAttributesBuilder();
+ resourceAttributeBuilder.setKey("resource.label_key_" + resourceAttributeI);
+ resourceAttributeBuilder.setValue(AnyValue.newBuilder().setStringValue("resource.label_value"));
+ }
+
+ for (int j = 0; j < instrumentationScopeCount; j++) {
+ ScopeSpans.Builder scopeSpansBuilder = resourceSpansBuilder.addScopeSpansBuilder();
+
+ for (int k = 0; k < spansCount; k++) {
+ Span.Builder spanBuilder = scopeSpansBuilder.addSpansBuilder();
+ spanBuilder.setStartTimeUnixNano(START)
+ .setEndTimeUnixNano(END)
+ .setStatus(Status.newBuilder().setCodeValue(100).setMessage("Dummy").build())
+ .setName("spanName")
+ .setSpanId(ByteString.copyFrom("Span-Id", StandardCharsets.UTF_8))
+ .setParentSpanId(ByteString.copyFrom("Parent-Span-Id", StandardCharsets.UTF_8))
+ .setTraceId(ByteString.copyFrom("Trace-Id", StandardCharsets.UTF_8))
+ .setKind(Span.SpanKind.SPAN_KIND_CONSUMER);
+
+ for (int spanAttributeI = 0; spanAttributeI < 10; spanAttributeI++) {
+ KeyValue.Builder attributeBuilder = spanBuilder.addAttributesBuilder();
+ attributeBuilder.setKey("foo_key_" + spanAttributeI);
+ attributeBuilder.setValue(AnyValue.newBuilder().setStringValue("foo-value"));
+ }
+ }
+ }
+ }
+ return ByteBuffer.wrap(tracesData.build().toByteArray());
+ }
+
+ @Setup
+ public void init()
+ {
+ BUFFER = createTracesBuffer();
+ }
+
+ @Benchmark
+ public void measureSerde(Blackhole blackhole) throws IOException
+ {
+ for (CloseableIterator it = INPUT_FORMAT.createReader(ROW_SCHEMA, new ByteEntity(BUFFER), null).read(); it.hasNext(); ) {
+ InputRow row = it.next();
+ blackhole.consume(row);
+ }
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormatTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormatTest.java
new file mode 100644
index 000000000000..8f44388a090b
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufInputFormatTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf.traces;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryProtobufExtensionsModule;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class OpenTelemetryTracesProtobufInputFormatTest
+{
+ final ObjectMapper jsonMapper = new ObjectMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ String resourceAttrPrefix = "test.resource.";
+ String spanAttrPrefix = "test.span.";
+ String kind = "test.kind";
+ String spanName = "test.name";
+ String parentSpanId = "test.parent.span";
+ String spanId = "test.span_id";
+ String traceId = "test.trace.id";
+ String statusCode = "test.status.code";
+ String statusMessage = "test.status.message";
+ String endTime = "test.end.time";
+ OpenTelemetryTracesProtobufInputFormat inputFormat = new OpenTelemetryTracesProtobufInputFormat(
+ "test.span.",
+ "test.resource.",
+ "test.name",
+ "test.span_id",
+ "test.parent.span",
+ "test.trace.id",
+ "test.end.time",
+ "test.status.code",
+ "test.status.message",
+ "test.kind");
+ jsonMapper.registerModules(new OpenTelemetryProtobufExtensionsModule().getJacksonModules());
+
+ final OpenTelemetryTracesProtobufInputFormat actual = (OpenTelemetryTracesProtobufInputFormat) jsonMapper.readValue(
+ jsonMapper.writeValueAsString(inputFormat),
+ InputFormat.class
+ );
+
+ Assert.assertEquals(inputFormat, actual);
+ }
+
+ @Test
+ public void testDefaults() throws Exception
+ {
+ OpenTelemetryTracesProtobufInputFormat obj = new OpenTelemetryTracesProtobufInputFormat(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_RESOURCE_ATTR_PREFIX, obj.getResourceAttributePrefix());
+ assertEquals("", obj.getSpanAttributePrefix());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_KIND_DIMENSION, obj.getKindDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_SPAN_NAME_DIMENSION, obj.getSpanNameDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_PARENT_SPAN_ID_DIMENSION, obj.getParentSpanIdDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_SPAN_ID_DIMENSION, obj.getSpanIdDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_TRACE_ID_DIMENSION, obj.getTraceIdDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_STATUS_MESSAGE_DIMENSION, obj.getStatusMessageDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_STATUS_CODE_DIMENSION, obj.getStatusCodeDimension());
+ assertEquals(OpenTelemetryTracesProtobufInputFormat.DEFAULT_END_TIME_DIMENSION, obj.getEndTimeDimension());
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(OpenTelemetryTracesProtobufInputFormat.class).usingGetClass().verify();
+ }
+}
diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReaderTest.java
new file mode 100644
index 000000000000..ae06e7f1d9a4
--- /dev/null
+++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/traces/OpenTelemetryTracesProtobufReaderTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.opentelemetry.protobuf.traces;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.ArrayValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.trace.v1.Span;
+import io.opentelemetry.proto.trace.v1.Status;
+import io.opentelemetry.proto.trace.v1.TracesData;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.druid.data.input.opentelemetry.protobuf.TestUtils.assertDimensionEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OpenTelemetryTracesProtobufReaderTest
+{
+ private static final String INSTRUMENTATION_SCOPE_NAME = "mock-instr-lib";
+ private static final String INSTRUMENTATION_SCOPE_VERSION = "1.0";
+
+ private static final String ATTRIBUTE_NAMESPACE = "namespace";
+ private static final String ATTRIBUTE_VALUE_NAMESPACE = "namespace_val";
+ private static final String ATTRIBUTE_SERVICE = "service";
+ private static final String ATTRIBUTE_VALUE_TEST = "test-service";
+
+ private static final String NAME_VALUE = "span-name";
+ private static final byte[] SPAN_ID_VALUE = "abcd".getBytes(StandardCharsets.UTF_8);
+ private static final String SPAN_ID_VALUE_HEX = Hex.encodeHexString(SPAN_ID_VALUE);
+ private static final byte[] PARENT_ID_VALUE = "1234".getBytes(StandardCharsets.UTF_8);
+ private static final String PARENT_ID_VALUE_HEX = Hex.encodeHexString(PARENT_ID_VALUE);
+ private static final byte[] TRACE_ID_VALUE = "zyxwvutsrqponmlk".getBytes(StandardCharsets.UTF_8);
+ private static final String TRACE_ID_VALUE_HEX = Hex.encodeHexString(TRACE_ID_VALUE);
+ private static final Span.SpanKind SPAN_KIND_VALUE = Span.SpanKind.SPAN_KIND_SERVER;
+
+ private final long now = System.nanoTime();
+ private final long before = now - 1000;
+ private final TracesData.Builder dataBuilder = TracesData.newBuilder();
+
+ private final Span.Builder spanBuilder = dataBuilder
+ .addResourceSpansBuilder()
+ .addScopeSpansBuilder()
+ .addSpansBuilder();
+
+ private final String spanAttributePrefix = "span.";
+ private final String resourceAttributePrefix = "resource.";
+ private final String spanNameDimension = "name";
+ private final String spanIdDimension = "span_id";
+ private final String parentSpanIdDimension = "parent_span_id";
+ private final String traceIdDimension = "trace_id";
+ private final String endTimeDimension = "end_time";
+ private final String statusCodeDimension = "status_code";
+ private final String statusMessageDimension = "status_message";
+ private final String kindDimension = "kind";
+
+ private final DimensionsSpec dimensionsSpec = new DimensionsSpec(ImmutableList.of(
+ new StringDimensionSchema("resource." + ATTRIBUTE_NAMESPACE),
+ new StringDimensionSchema("span." + ATTRIBUTE_SERVICE)
+ ));
+
+ @Before
+ public void setUp()
+ {
+ dataBuilder
+ .getResourceSpansBuilder(0)
+ .getResourceBuilder()
+ .addAttributes(
+ KeyValue.newBuilder()
+ .setKey(ATTRIBUTE_NAMESPACE)
+ .setValue(AnyValue.newBuilder().setStringValue(ATTRIBUTE_VALUE_NAMESPACE))
+ .build());
+
+ dataBuilder
+ .getResourceSpansBuilder(0)
+ .getScopeSpansBuilder(0)
+ .getScopeBuilder()
+ .setName(INSTRUMENTATION_SCOPE_NAME)
+ .setVersion(INSTRUMENTATION_SCOPE_VERSION);
+
+ spanBuilder
+ .setStartTimeUnixNano(before)
+ .setEndTimeUnixNano(now)
+ .setName(NAME_VALUE)
+ .setStatus(Status.newBuilder().setCodeValue(1).setMessage("OK").build())
+ .setTraceId(ByteString.copyFrom(TRACE_ID_VALUE))
+ .setSpanId(ByteString.copyFrom(SPAN_ID_VALUE))
+ .setParentSpanId(ByteString.copyFrom(PARENT_ID_VALUE))
+ .setKind(SPAN_KIND_VALUE)
+ .addAttributes(
+ KeyValue.newBuilder()
+ .setKey(ATTRIBUTE_SERVICE)
+ .setValue(AnyValue.newBuilder().setStringValue(ATTRIBUTE_VALUE_TEST))
+ .build());
+ }
+
+ private CloseableIterator getDataIterator(DimensionsSpec spec)
+ {
+ TracesData tracesData = dataBuilder.build();
+ SettableByteEntity settableByteEntity = new SettableByteEntity<>();
+ settableByteEntity.setEntity(new ByteEntity(tracesData.toByteArray()));
+ return new OpenTelemetryTracesProtobufReader(
+ spec,
+ settableByteEntity,
+ spanAttributePrefix,
+ resourceAttributePrefix,
+ spanNameDimension,
+ spanIdDimension,
+ parentSpanIdDimension,
+ traceIdDimension,
+ endTimeDimension,
+ statusCodeDimension,
+ statusMessageDimension,
+ kindDimension
+ ).read();
+ }
+
+ private void verifyDefaultFirstRowData(InputRow row)
+ {
+ assertDimensionEquals(row, "resource.namespace", ATTRIBUTE_VALUE_NAMESPACE);
+ assertDimensionEquals(row, spanNameDimension, NAME_VALUE);
+ assertDimensionEquals(row, spanIdDimension, SPAN_ID_VALUE_HEX);
+ assertDimensionEquals(row, parentSpanIdDimension, PARENT_ID_VALUE_HEX);
+ assertDimensionEquals(row, traceIdDimension, TRACE_ID_VALUE_HEX);
+ assertDimensionEquals(row, endTimeDimension, Long.toString(TimeUnit.NANOSECONDS.toMillis(now)));
+ assertDimensionEquals(row, statusCodeDimension, Integer.toString(1));
+ assertDimensionEquals(row, statusMessageDimension, "OK");
+ assertDimensionEquals(row, kindDimension, "SERVER");
+ }
+
+ @Test
+ public void testTrace()
+ {
+ CloseableIterator rows = getDataIterator(dimensionsSpec);
+ List rowList = new ArrayList<>();
+ rows.forEachRemaining(rowList::add);
+ assertEquals(1, rowList.size());
+
+ InputRow row = rowList.get(0);
+ assertEquals(2, row.getDimensions().size());
+ verifyDefaultFirstRowData(row);
+ assertDimensionEquals(row, "span.service", ATTRIBUTE_VALUE_TEST);
+ }
+
+ @Test
+ public void testBatchedTraceParse()
+ {
+ Span.Builder secondSpanBuilder = dataBuilder
+ .addResourceSpansBuilder()
+ .addScopeSpansBuilder()
+ .addSpansBuilder();
+
+ dataBuilder
+ .getResourceSpansBuilder(1)
+ .getResourceBuilder()
+ .addAttributes(
+ KeyValue.newBuilder()
+ .setKey(ATTRIBUTE_NAMESPACE)
+ .setValue(AnyValue.newBuilder().setStringValue(ATTRIBUTE_VALUE_NAMESPACE))
+ .build());
+ dataBuilder
+ .getResourceSpansBuilder(1)
+ .getScopeSpansBuilder(0)
+ .getScopeBuilder()
+ .setName(INSTRUMENTATION_SCOPE_NAME)
+ .setVersion(INSTRUMENTATION_SCOPE_VERSION);
+
+ String spanName2 = "span-2";
+ byte[] traceId2 = "trace-2".getBytes(StandardCharsets.UTF_8);
+ byte[] spanId2 = "span-2".getBytes(StandardCharsets.UTF_8);
+ byte[] parentId2 = "parent-2".getBytes(StandardCharsets.UTF_8);
+ Span.SpanKind spanKind2 = Span.SpanKind.SPAN_KIND_CLIENT;
+ String spanAttributeKey2 = "someIntAttribute";
+ int spanAttributeVal2 = 23;
+ String statusMessage2 = "NOT_OK";
+ int statusCode2 = 400;
+
+ secondSpanBuilder
+ .setStartTimeUnixNano(before)
+ .setEndTimeUnixNano(now)
+ .setName(spanName2)
+ .setStatus(Status.newBuilder().setCodeValue(statusCode2).setMessage(statusMessage2).build())
+ .setTraceId(ByteString.copyFrom(traceId2))
+ .setSpanId(ByteString.copyFrom(spanId2))
+ .setParentSpanId(ByteString.copyFrom(parentId2))
+ .setKind(spanKind2)
+ .addAttributes(
+ KeyValue.newBuilder()
+ .setKey(spanAttributeKey2)
+ .setValue(AnyValue.newBuilder().setIntValue(spanAttributeVal2))
+ .build());
+ CloseableIterator rows = getDataIterator(dimensionsSpec);
+ List rowList = new ArrayList<>();
+ rows.forEachRemaining(rowList::add);
+ assertEquals(2, rowList.size());
+
+ InputRow row = rowList.get(0);
+ assertEquals(2, row.getDimensions().size());
+ assertDimensionEquals(row, spanAttributePrefix + ATTRIBUTE_SERVICE, ATTRIBUTE_VALUE_TEST);
+ verifyDefaultFirstRowData(row);
+
+ row = rowList.get(1);
+ assertEquals(2, row.getDimensions().size());
+ assertDimensionEquals(row, resourceAttributePrefix + ATTRIBUTE_NAMESPACE,
+ ATTRIBUTE_VALUE_NAMESPACE);
+ assertDimensionEquals(row, spanAttributePrefix + spanAttributeKey2,
+ Integer.toString(spanAttributeVal2));
+ assertDimensionEquals(row, spanNameDimension, spanName2);
+ assertDimensionEquals(row, spanIdDimension, Hex.encodeHexString(spanId2));
+ assertDimensionEquals(row, parentSpanIdDimension, Hex.encodeHexString(parentId2));
+ assertDimensionEquals(row, traceIdDimension, Hex.encodeHexString(traceId2));
+ assertDimensionEquals(row, endTimeDimension, Long.toString(TimeUnit.NANOSECONDS.toMillis(now)));
+ assertDimensionEquals(row, statusCodeDimension, Integer.toString(statusCode2));
+ assertDimensionEquals(row, statusMessageDimension, statusMessage2);
+ assertDimensionEquals(row, kindDimension, "CLIENT");
+ }
+
+ @Test
+ public void testDimensionSpecExclusions()
+ {
+ String excludedAttribute = spanAttributePrefix + ATTRIBUTE_SERVICE;
+ DimensionsSpec dimensionsSpecWithExclusions = DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of(
+ excludedAttribute
+ )).build();
+ CloseableIterator rows = getDataIterator(dimensionsSpecWithExclusions);
+ assertTrue(rows.hasNext());
+ InputRow row = rows.next();
+ assertFalse(row.getDimensions().contains(excludedAttribute));
+ assertEquals(9, row.getDimensions().size());
+ verifyDefaultFirstRowData(row);
+ }
+
+ @Test
+ public void testInvalidProtobuf() throws IOException
+ {
+ byte[] invalidProtobuf = new byte[] {0x04, 0x01};
+ SettableByteEntity settableByteEntity = new SettableByteEntity<>();
+ settableByteEntity.setEntity(new ByteEntity(invalidProtobuf));
+ try (CloseableIterator rows = new OpenTelemetryTracesProtobufReader(
+ dimensionsSpec,
+ settableByteEntity,
+ spanAttributePrefix,
+ resourceAttributePrefix,
+ spanNameDimension,
+ spanIdDimension,
+ parentSpanIdDimension,
+ traceIdDimension,
+ endTimeDimension,
+ statusCodeDimension,
+ statusMessageDimension,
+ kindDimension
+ ).read()) {
+ Assert.assertThrows(ParseException.class, () -> rows.hasNext());
+ Assert.assertThrows(ParseException.class, () -> rows.next());
+ }
+ }
+
+ @Test
+ public void testInvalidAttributeValueType()
+ {
+ String excludededAttributeKey = "array";
+ spanBuilder
+ .addAttributes(KeyValue.newBuilder()
+ .setKey(excludededAttributeKey)
+ .setValue(AnyValue.newBuilder().setArrayValue(ArrayValue.getDefaultInstance()))
+ .build());
+ CloseableIterator rows = getDataIterator(dimensionsSpec);
+ assertTrue(rows.hasNext());
+ InputRow row = rows.next();
+ assertDimensionEquals(row, spanAttributePrefix + ATTRIBUTE_SERVICE, ATTRIBUTE_VALUE_TEST);
+ assertFalse(row.getDimensions().contains(spanAttributePrefix + excludededAttributeKey));
+ verifyDefaultFirstRowData(row);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7d0d385b4549..af65b1482cbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,9 +212,9 @@
extensions-contrib/aliyun-oss-extensions
extensions-contrib/prometheus-emitter
extensions-contrib/opentelemetry-emitter
+ extensions-contrib/opentelemetry-extensions
extensions-contrib/opencensus-extensions
extensions-contrib/confluent-extensions
- extensions-contrib/opentelemetry-extensions
distribution