Skip to content

internal/delegatingresolver: avoid proxy if networktype of target address is not tcp #8215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
104 changes: 90 additions & 14 deletions internal/resolver/delegatingresolver/delegatingresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
"fmt"
"net/http"
"net/url"
"strings"
"sync"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -189,18 +191,81 @@
r.proxyResolver = nil
}

// updateClientConnStateLocked creates a list of combined addresses by
// pairing each proxy address with every target address. For each pair, it
// generates a new [resolver.Address] using the proxy address, and adding the
// target address as the attribute along with user info. It returns nil if
// either resolver has not sent update even once and returns the error from
// ClientConn update once both resolvers have sent update atleast once.
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be exposed from its current location in internal instead of being copy/pasted here.

net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")
// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
return n, target[m1+1:]
}

Check warning on line 203 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr := t.Path
if scheme == "unix" {
if addr == "" {
addr = t.Host
}
return scheme, addr

Check warning on line 216 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L206-L216

Added lines #L206 - L216 were not covered by tests
}
}
return net, target
}

func networkTypeFromAddr(addr resolver.Address) (string, resolver.Address) {
networkType, ok := networktype.Get(addr)
if !ok {
networkType, addr.Addr = parseDialTarget(addr.Addr)
}
return networkType, addr
}

func tcpAddressPresent(state *resolver.State) bool {
for _, addr := range state.Addresses {
if networkType, _ := networkTypeFromAddr(addr); networkType == "tcp" {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if networktype, _ := networkTypeFromAddr(addr); networktype == "tcp" {
return true
}
}
}
return false
}

// updateClientConnStateLocked creates a list of combined addresses by pairing
// each proxy address with every target address. For each pair, it generates a
// new [resolver.Address] using the proxy address, and adding the target address
// as the attribute along with user info. It returns nil if either resolver has
// not sent update even once and returns the error from ClientConn update once
// both resolvers have sent update atleast once.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
if r.targetResolverState == nil {
return nil
}

curState := *r.targetResolverState

// If no addresses returned by resolver have network type ßas tcp , do not
// wait for proxy update.
if !tcpAddressPresent(&curState) {
return r.cc.UpdateState(curState)
}

if r.proxyAddrs == nil {
return nil
}

// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
Expand All @@ -217,7 +282,12 @@
proxyAddr = resolver.Address{Addr: r.proxyURL.Host}
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
for _, targetAddr := range curState.Addresses {
// Avoid proxy when network is not tcp.
if networkType, targetAddr := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
Expand All @@ -232,10 +302,15 @@
// address.The resulting list of addresses is then grouped into endpoints,
// covering all combinations of proxy and target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
for _, endpt := range curState.Endpoints {
var addrs []resolver.Address
for _, proxyAddr := range r.proxyAddrs {
for _, targetAddr := range endpt.Addresses {
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if networkType, targetAddr := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
Expand Down Expand Up @@ -297,8 +372,8 @@
// updateTargetResolverState updates the target resolver state by storing target
// addresses, endpoints, and service config, marking the resolver as ready, and
// triggering a state update if both resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It
// is a StateListener function of wrappingClientConn passed to the target resolver.
// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It is
// a StateListener function of wrappingClientConn passed to the target resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -335,7 +410,8 @@
return wcc.stateListener(state)
}

// ReportError intercepts errors from the child resolvers and passes them to ClientConn.
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
Expand Down
149 changes: 147 additions & 2 deletions internal/resolver/delegatingresolver/delegatingresolver_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/resolver/delegatingresolver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -441,16 +442,16 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) {
{
Addresses: []resolver.Address{
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr1),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr1),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr2),
},
},
{
Addresses: []resolver.Address{
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr3),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr3),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4),
proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr4),
},
},
Expand Down Expand Up @@ -760,3 +761,147 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) {
t.Fatalf("context timed out waiting for proxyResolver.ResolveNow() to be called.")
}
}

// Tests the scenario where a proxy is configured, and the resolver returns a
// network type other than tcp for all addresses. The test verifies that the
// delegating resolver avoids the proxy update and directly sends the update
// from target resolver to clientconn.
func (s) TestDelegatingResolverForNonTCPTarget(t *testing.T) {
const (
targetTestAddr = "test.target"
resolvedTargetTestAddr1 = "1.1.1.1:8080"
envProxyAddr = "proxytest.com"
)
hpfe := func(req *http.Request) (*url.URL, error) {
if req.URL.Host == targetTestAddr {
return &url.URL{
Scheme: "https",
Host: envProxyAddr,
}, nil
}
t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr)
return nil, nil
}
originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment
delegatingresolver.HTTPSProxyFromEnvironment = hpfe
defer func() {
delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe
}()

// Manual resolver to control the target resolution.
targetResolver := manual.NewBuilderWithScheme("test")
target := targetResolver.Scheme() + ":///" + targetTestAddr
// Set up a manual DNS resolver to control the proxy address resolution.
proxyResolver := setupDNS(t)

tcc, stateCh, _ := createTestResolverClientConn(t)
if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil {
t.Fatalf("Failed to create delegating resolver: %v", err)
}

// Set network to anything other than tcp.
nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix")
targetResolver.UpdateState(resolver.State{
Addresses: []resolver.Address{nonTCPAddr},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}},
ServiceConfig: &serviceconfig.ParseResult{},
})

var gotState resolver.State
select {
case gotState = <-stateCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a state update from the delegating resolver")
}

proxyResolver.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: envProxyAddr}},
ServiceConfig: &serviceconfig.ParseResult{},
})

wantState := resolver.State{
Addresses: []resolver.Address{nonTCPAddr},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}},
ServiceConfig: &serviceconfig.ParseResult{},
}

// Verify that the state clientconn receives is same as updated by target
// resolver, since we want to avoid proxy for any network type apart from
// tcp.
if diff := cmp.Diff(gotState, wantState); diff != "" {
t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff)
}
}

// Tests the scenario where a proxy is configured, and the resolver returns
// addresses with varied network type. The test verifies that the delegating
// resolver doesnt add proxyatrribute to adresses with network type other than
// tcp , but adds the proxyattribute to addresses with network type tcp.
func (s) TestDelegatingResolverForMixNetworkType(t *testing.T) {
const (
targetTestAddr = "test.target"
resolvedTargetTestAddr1 = "1.1.1.1:8080"
resolvedTargetTestAddr2 = "2.2.2.2:8080"
envProxyAddr = "proxytest.com"
)
hpfe := func(req *http.Request) (*url.URL, error) {
if req.URL.Host == targetTestAddr {
return &url.URL{
Scheme: "https",
Host: envProxyAddr,
}, nil
}
t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr)
return nil, nil
}
originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment
delegatingresolver.HTTPSProxyFromEnvironment = hpfe
defer func() {
delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe
}()

// Manual resolver to control the target resolution.
targetResolver := manual.NewBuilderWithScheme("test")
target := targetResolver.Scheme() + ":///" + targetTestAddr
// Set up a manual DNS resolver to control the proxy address resolution.
proxyResolver := setupDNS(t)

tcc, stateCh, _ := createTestResolverClientConn(t)
if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil {
t.Fatalf("Failed to create delegating resolver: %v", err)
}
// Set network to anything other than tcp.
nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix")
targetResolver.UpdateState(resolver.State{
Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}}}},
ServiceConfig: &serviceconfig.ParseResult{},
})

select {
case <-stateCh:
t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.")
case <-time.After(defaultTestShortTimeout):
}

proxyResolver.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: envProxyAddr}},
ServiceConfig: &serviceconfig.ParseResult{},
})

var gotState resolver.State
select {
case gotState = <-stateCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a state update from the delegating resolver")
}
wantState := resolver.State{
Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)}}},
ServiceConfig: &serviceconfig.ParseResult{},
}

if diff := cmp.Diff(gotState, wantState); diff != "" {
t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff)
}
}