Skip to content

Implement grpc.lb.backend_service optional label #11990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public Status onResult2(ResolutionResult resolutionResult) {
@Documented
public @interface ResolutionResultAttr {}

@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989")
@ResolutionResultAttr
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE");

/**
* Information that a {@link Factory} uses to create a {@link NameResolver}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.opentelemetry;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
Expand Down Expand Up @@ -70,7 +71,6 @@
*/
final class OpenTelemetryMetricsModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
ImmutableSet.of(
"grpc.client.attempt.started",
Expand All @@ -90,14 +90,16 @@ final class OpenTelemetryMetricsModule {
private final OpenTelemetryMetricsResource resource;
private final Supplier<Stopwatch> stopwatchSupplier;
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
List<OpenTelemetryPlugin> plugins) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
}

Expand Down Expand Up @@ -162,6 +164,7 @@ private static final class ClientTracer extends ClientStreamTracer {
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile String locality;
volatile String backendService;
long attemptNanos;
Code statusCode;

Expand Down Expand Up @@ -206,9 +209,12 @@ public void inboundWireSize(long bytes) {

@Override
public void addOptionalLabel(String key, String value) {
if (LOCALITY_LABEL_NAME.equals(key)) {
if ("grpc.lb.locality".equals(key)) {
locality = value;
}
if ("grpc.lb.backend_service".equals(key)) {
backendService = value;
}
}

@Override
Expand Down Expand Up @@ -248,6 +254,13 @@ void recordFinishedAttempt() {
}
builder.put(LOCALITY_KEY, savedLocality);
}
if (module.backendServiceEnabled) {
String savedBackendService = backendService;
if (savedBackendService == null) {
savedBackendService = "";
}
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
}
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public final class OpenTelemetryConstants {
public static final AttributeKey<String> LOCALITY_KEY =
AttributeKey.stringKey("grpc.lb.locality");

public static final AttributeKey<String> BACKEND_SERVICE_KEY =
AttributeKey.stringKey("grpc.lb.backend_service");

public static final List<Double> LATENCY_BUCKETS =
ImmutableList.of(
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.grpc.testing.GrpcServerRule;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
Expand Down Expand Up @@ -1070,6 +1071,140 @@ public void clientLocalityMetrics_missing() {
point -> point.hasAttributes(clientAttributes))));
}

@Test
public void clientBackendServiceMetrics_present() {
String target = "target:///";
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can add a constant for "grpc.lb.backend_service"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What goal do you have in mind? I'm generally only a fan of constants in tests for things that don't matter to the test or to share between positive and negative tests. This string is part of the API and can't change. And if it is misspelled we want the test to fail and the test is more obvious with the literal. I'd use a constant if the "should-be-overwritten" was further away from the "the-moon", as misspelling the first prevents part of the test from working.

There would actually need to be two constants here: one for this line and the assertion, and a different constant (with same value) for the addOptionalLabel(). Those just happen to be the same string.

This did point out to me that the test shouldn't be using BACKEND_SERVICE_KEY, so I've removed the usage of that constant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted it to be a constant so that any modification to the literal could be done easily.

emptyList());
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());

ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten");
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());

io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
= clientAttributes.toBuilder()
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon")
.build();

assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}

@Test
public void clientBackendServiceMetrics_missing() {
String target = "target:///";
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
emptyList());
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());

ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
tracer.streamClosed(Status.OK);
callAttemptsTracerFactory.callEnded(Status.OK);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

io.opentelemetry.api.common.Attributes clientAttributes
= io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName(),
STATUS_KEY, Status.Code.OK.toString());

io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
= clientAttributes.toBuilder()
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "")
.build();

assertThat(openTelemetryTesting.getMetrics())
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasLongSumSatisfying(
longSum -> longSum.hasPointsSatisfying(
point -> point.hasAttributes(attributes))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributesWithBackendService))),
metric ->
assertThat(metric)
.hasName(CLIENT_CALL_DURATION)
.hasHistogramSatisfying(
histogram -> histogram.hasPointsSatisfying(
point -> point.hasAttributes(clientAttributes))));
}

@Test
public void serverBasicMetrics() {
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
Expand Down
5 changes: 4 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -150,7 +151,9 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

childSwitchLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setAttributes(attributes)
.setAttributes(attributes.toBuilder()
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
.build())
.setLoadBalancingPolicyConfig(config.childConfig)
.build());
return Status.OK;
Expand Down
46 changes: 33 additions & 13 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,44 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
private final long infTime;
private final Ticker ticker;
private String locality = "";
private String backendService = "";
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());

// The metric instruments are only registered once and shared by all instances of this LB.
static {
MetricInstrumentRegistry metricInstrumentRegistry
= MetricInstrumentRegistry.getDefaultRegistry();
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.rr_fallback",
"EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
+ "with valid weight, which caused the WRR policy to fall back to RR behavior",
"{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
"{update}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
+ "from each scheduler update that don't yet have usable weight information",
"{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
+ "weight information",
"{endpoint}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
"grpc.lb.wrr.endpoint_weight_stale",
"EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
+ "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality"), false);
+ "older than the expiration period",
"{endpoint}",
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
"grpc.lb.wrr.endpoint_weights",
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
"{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality"),
"{weight}",
Lists.newArrayList(),
Lists.newArrayList("grpc.target"),
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
false);
}

Expand Down Expand Up @@ -168,6 +180,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
} else {
this.locality = "";
}
String backendService
= resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
if (backendService != null) {
this.backendService = backendService;
} else {
this.backendService = "";
}
Comment on lines +183 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can think of exposing a getOrDefault() from the Attributes class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically null could be stored as a value, and then we'd still get null here even with getOrDefault(). Some of our Key classes (but not Attributes) allow choosing a "default" value when it isn't present. That'd probably be better in general than a getOrDefault(). Even if I wanted to do that now, I wouldn't mix it into this PR.

config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();

Expand Down Expand Up @@ -232,26 +251,27 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
helper.getMetricRecorder()
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
}

if (staleEndpoints.get() > 0) {
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
}
if (notYetUsableEndpoints.get() > 0) {
helper.getMetricRecorder()
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality, backendService));
}
boolean weightsEffective = picker.updateWeight(newWeights);
if (!weightsEffective) {
helper.getMetricRecorder()
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
ImmutableList.of(locality));
ImmutableList.of(locality, backendService));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
Expand Down Expand Up @@ -198,6 +199,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() {
assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
assertThat(childBalancer.attributes.get(XdsAttributes.XDS_CLIENT_POOL))
.isSameInstanceAs(xdsClientPool);
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
}

/**
Expand Down
Loading