Skip to content

balancer/pickfirstleaf: Avoid reading Address.Metadata #8227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,5 @@ func (al *addressList) hasNext() bool {
// fields that are meaningful to the SubConn.
func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
return a.Addr == b.Addr && a.ServerName == b.ServerName &&
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
a.Attributes.Equal(b.Attributes)
}
80 changes: 80 additions & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -1424,6 +1425,85 @@ func (s) TestPickFirstLeaf_HealthUpdates(t *testing.T) {
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
}

// Tests the case where an address update received by the pick_first LB policy
// differs in metadata which should be ignored by the LB policy. In this case,
// the test verifies that new connections are not created when the address
// update only changes the metadata.
func (s) TestPickFirstLeaf_AddressUpdateWithMetadata(t *testing.T) {
dialer := testutils.NewBlockingDialer()
dopts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
grpc.WithContextDialer(dialer.DialContext),
}
cc, r, backends := setupPickFirstLeaf(t, 2, dopts...)

// Add a metadata to the addresses before pushing them to the pick_first LB
// policy through the manual resolver.
addrs := backends.resolverAddrs()
for i := range addrs {
addrs[i].Metadata = &metadata.MD{
"test-metadata-1": []string{fmt.Sprintf("%d", i)},
}
}
r.UpdateState(resolver.State{Addresses: addrs})

// Ensure that RPCs succeed to the expected backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Create holds for each backend. This will be used to verify the connection
// is not re-established.
holds := backends.holds(dialer)

// Add metadata to the addresses before pushing them to the pick_first LB
// policy through the manual resolver. Leave the order of the addresses
// unchanged.
for i := range addrs {
addrs[i].Metadata = &metadata.MD{
"test-metadata-2": []string{fmt.Sprintf("%d", i)},
}
}
r.UpdateState(resolver.State{Addresses: addrs})

// Ensure that no new connection is established.
for i := range holds {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if holds[i].Wait(sCtx) {
t.Fatalf("Unexpected connection attempt to backend: %s", addrs[i])
}
}

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Add metadata to the addresses before pushing them to the pick_first LB
// policy through the manual resolver. Reverse of the order of addresses.
for i := range addrs {
addrs[i].Metadata = &metadata.MD{
"test-metadata-3": []string{fmt.Sprintf("%d", i)},
}
}
addrs[0], addrs[1] = addrs[1], addrs[0]
r.UpdateState(resolver.State{Addresses: addrs})

// Ensure that no new connection is established.
for i := range holds {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if holds[i].Wait(sCtx) {
t.Fatalf("Unexpected connection attempt to backend: %s", addrs[i])
}
}
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}

// healthListenerCapturingCCWrapper is used to capture the health listener so
// that health updates can be mocked for testing.
type healthListenerCapturingCCWrapper struct {
Expand Down