diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index e8295365ca8..21064d7f115 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -275,6 +275,11 @@ public Status onResult2(ResolutionResult resolutionResult) { @Documented public @interface ResolutionResultAttr {} + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989") + @ResolutionResultAttr + public static final Attributes.Key ATTR_BACKEND_SERVICE = + Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE"); + /** * Information that a {@link Factory} uses to create a {@link NameResolver}. * diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b6cf09d9db6..82ad41e7b20 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -17,6 +17,7 @@ package io.grpc.opentelemetry; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -70,7 +71,6 @@ */ final class OpenTelemetryMetricsModule { private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName()); - private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality"; public static final ImmutableSet DEFAULT_PER_CALL_METRICS_SET = ImmutableSet.of( "grpc.client.attempt.started", @@ -90,6 +90,7 @@ final class OpenTelemetryMetricsModule { private final OpenTelemetryMetricsResource resource; private final Supplier stopwatchSupplier; private final boolean localityEnabled; + private final boolean backendServiceEnabled; private final ImmutableList plugins; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, @@ -97,7 +98,8 @@ final class OpenTelemetryMetricsModule { List plugins) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); - this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME); + this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); + this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); } @@ -162,6 +164,7 @@ private static final class ClientTracer extends ClientStreamTracer { volatile long outboundWireSize; volatile long inboundWireSize; volatile String locality; + volatile String backendService; long attemptNanos; Code statusCode; @@ -206,9 +209,12 @@ public void inboundWireSize(long bytes) { @Override public void addOptionalLabel(String key, String value) { - if (LOCALITY_LABEL_NAME.equals(key)) { + if ("grpc.lb.locality".equals(key)) { locality = value; } + if ("grpc.lb.backend_service".equals(key)) { + backendService = value; + } } @Override @@ -248,6 +254,13 @@ void recordFinishedAttempt() { } builder.put(LOCALITY_KEY, savedLocality); } + if (module.backendServiceEnabled) { + String savedBackendService = backendService; + if (savedBackendService == null) { + savedBackendService = ""; + } + builder.put(BACKEND_SERVICE_KEY, savedBackendService); + } for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 081e376b8c5..5214804d369 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -33,6 +33,9 @@ public final class OpenTelemetryConstants { public static final AttributeKey LOCALITY_KEY = AttributeKey.stringKey("grpc.lb.locality"); + public static final AttributeKey BACKEND_SERVICE_KEY = + AttributeKey.stringKey("grpc.lb.backend_service"); + public static final List LATENCY_BUCKETS = ImmutableList.of( 0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d, diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 6003599668b..77f0268ec2f 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -51,6 +51,7 @@ import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.grpc.testing.GrpcServerRule; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -1070,6 +1071,140 @@ public void clientLocalityMetrics_missing() { point -> point.hasAttributes(clientAttributes)))); } + @Test + public void clientBackendServiceMetrics_present() { + String target = "target:///"; + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), + emptyList()); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.addOptionalLabel("grpc.lb.foo", "unimportant"); + tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten"); + tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon"); + tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + io.opentelemetry.api.common.Attributes clientAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName(), + STATUS_KEY, Status.Code.OK.toString()); + + io.opentelemetry.api.common.Attributes clientAttributesWithBackendService + = clientAttributes.toBuilder() + .put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME) + .hasLongSumSatisfying( + longSum -> longSum.hasPointsSatisfying( + point -> point.hasAttributes(attributes))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_CALL_DURATION) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributes)))); + } + + @Test + public void clientBackendServiceMetrics_missing() { + String target = "target:///"; + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), + emptyList()); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK); + + io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName()); + + io.opentelemetry.api.common.Attributes clientAttributes + = io.opentelemetry.api.common.Attributes.of( + TARGET_KEY, target, + METHOD_KEY, method.getFullMethodName(), + STATUS_KEY, Status.Code.OK.toString()); + + io.opentelemetry.api.common.Attributes clientAttributesWithBackendService + = clientAttributes.toBuilder() + .put(AttributeKey.stringKey("grpc.lb.backend_service"), "") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME) + .hasLongSumSatisfying( + longSum -> longSum.hasPointsSatisfying( + point -> point.hasAttributes(attributes))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributesWithBackendService))), + metric -> + assertThat(metric) + .hasName(CLIENT_CALL_DURATION) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(clientAttributes)))); + } + @Test public void serverBasicMetrics() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index fd4f49fbb83..034cdee0815 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -32,6 +32,7 @@ import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.Metadata; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.internal.GrpcUtil; @@ -150,7 +151,9 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { childSwitchLb.handleResolvedAddresses( resolvedAddresses.toBuilder() - .setAttributes(attributes) + .setAttributes(attributes.toBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, cluster) + .build()) .setLoadBalancingPolicyConfig(config.childConfig) .build()); return Status.OK; diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index b7b4fd51afe..c3d36f7cdc8 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -102,32 +102,44 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer { private final long infTime; private final Ticker ticker; private String locality = ""; + private String backendService = ""; private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); // The metric instruments are only registered once and shared by all instances of this LB. static { MetricInstrumentRegistry metricInstrumentRegistry = MetricInstrumentRegistry.getDefaultRegistry(); - RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback", + RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.wrr.rr_fallback", "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints " + "with valid weight, which caused the WRR policy to fall back to RR behavior", - "{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), + "{update}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"), false); ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter( - "grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints " - + "from each scheduler update that don't yet have usable weight information", - "{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"), + "grpc.lb.wrr.endpoint_weight_not_yet_usable", + "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable " + + "weight information", + "{endpoint}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"), false); ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter( "grpc.lb.wrr.endpoint_weight_stale", "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is " - + "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"), - Lists.newArrayList("grpc.lb.locality"), false); + + "older than the expiration period", + "{endpoint}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"), + false); ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram( "grpc.lb.wrr.endpoint_weights", "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.", - "{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"), - Lists.newArrayList("grpc.lb.locality"), + "{weight}", + Lists.newArrayList(), + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"), false); } @@ -168,6 +180,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { } else { this.locality = ""; } + String backendService + = resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE); + if (backendService != null) { + this.backendService = backendService; + } else { + this.backendService = ""; + } config = (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); @@ -232,7 +251,7 @@ private void updateWeight(WeightedRoundRobinPicker picker) { helper.getMetricRecorder() .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight, ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of(locality)); + ImmutableList.of(locality, backendService)); newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f; } @@ -240,18 +259,19 @@ private void updateWeight(WeightedRoundRobinPicker picker) { helper.getMetricRecorder() .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(), ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of(locality)); + ImmutableList.of(locality, backendService)); } if (notYetUsableEndpoints.get() > 0) { helper.getMetricRecorder() .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(), - ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality)); + ImmutableList.of(helper.getChannelTarget()), + ImmutableList.of(locality, backendService)); } boolean weightsEffective = picker.updateWeight(newWeights); if (!weightsEffective) { helper.getMetricRecorder() .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of(locality)); + ImmutableList.of(locality, backendService)); } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 09a1abb36e0..7df0630b779 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -50,6 +50,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Metadata; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; @@ -198,6 +199,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() { assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig); assertThat(childBalancer.attributes.get(XdsAttributes.XDS_CLIENT_POOL)) .isSameInstanceAs(xdsClientPool); + assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER); } /** diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 9ab783223d9..bf23c65def4 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -58,6 +58,7 @@ import io.grpc.Metadata; import io.grpc.MetricRecorder; import io.grpc.MetricSink; +import io.grpc.NameResolver; import io.grpc.NoopMetricSink; import io.grpc.ServerCall; import io.grpc.ServerServiceDefinition; @@ -161,6 +162,7 @@ public void uncaughtException(Thread t, Throwable e) { private String channelTarget = "channel-target"; private String locality = "locality"; + private String backendService = "the-backend-service"; public WeightedRoundRobinLoadBalancerTest() { testHelperInstance = new TestHelper(); @@ -1119,7 +1121,9 @@ public void removingAddressShutsdownSubchannel() { public void metrics() { // Give WRR some valid addresses to work with. Attributes attributesWithLocality = Attributes.newBuilder() - .set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build(); + .set(WeightedTargetLoadBalancer.CHILD_NAME, locality) + .set(NameResolver.ATTR_BACKEND_SERVICE, backendService) + .build(); syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(attributesWithLocality).build())); @@ -1269,7 +1273,7 @@ public void metricWithRealChannel() throws Exception { argThat((instr) -> instr.getName().equals("grpc.lb.wrr.rr_fallback")), eq(1L), eq(Arrays.asList("directaddress:///wrr-metrics")), - eq(Arrays.asList(""))); + eq(Arrays.asList("", ""))); } // Verifies that the MetricRecorder has been called to record a long counter value of 1 for the @@ -1281,7 +1285,10 @@ private void verifyLongCounterRecord(String name, int times, long value) { public boolean matches(LongCounterMetricInstrument longCounterInstrument) { return longCounterInstrument.getName().equals(name); } - }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality))); + }), + eq(value), + eq(Lists.newArrayList(channelTarget)), + eq(Lists.newArrayList(locality, backendService))); } // Verifies that the MetricRecorder has been called to record a given double histogram value the @@ -1293,7 +1300,10 @@ private void verifyDoubleHistogramRecord(String name, int times, double value) { public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) { return doubleHistogramInstrument.getName().equals(name); } - }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality))); + }), + eq(value), + eq(Lists.newArrayList(channelTarget)), + eq(Lists.newArrayList(locality, backendService))); } private int getNumFilteredPendingTasks() {