Skip to content

Commit cc702b5

Browse files
Demo: Feat/proxy connection settings (#389)
* Update spec * wip proxy demo * Fix tests * Fix and add tests * Add test case * Add proxy url to example ui * Use custom dialer for websocket http/https proxies * clear previous settings * Add new import to internal/examples * Only use ProxyConnectDialer when connect headers are specified * Review feedback * Add WS client test * add invalid url tests * Revert change to dialer instantiation * Remove testing instructions * Update opamp-spec to v0.14.0, fix tests
1 parent 6af85ac commit cc702b5

File tree

15 files changed

+871
-266
lines changed

15 files changed

+871
-266
lines changed

client/httpclient.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) er
5555
c.sender.EnableCompression()
5656
}
5757

58+
if settings.ProxyURL != "" {
59+
if err := c.sender.SetProxy(settings.ProxyURL, settings.ProxyHeaders); err != nil {
60+
return err
61+
}
62+
}
63+
5864
// Prepare the first message to send.
5965
err := c.common.PrepareFirstMessage(ctx)
6066
if err != nil {

client/internal/httpsender.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"io"
1111
"net/http"
12+
"net/url"
1213
"sync"
1314
"sync/atomic"
1415
"time"
@@ -84,6 +85,35 @@ func NewHTTPSender(logger types.Logger) *HTTPSender {
8485
return h
8586
}
8687

88+
// SetProxy will force each request to use passed proxy and use the passed headers when making a CONNECT request to the proxy.
89+
// If the proxy has no schema http is used.
90+
// This method is not thread safe and must be called before h.client is used.
91+
func (h *HTTPSender) SetProxy(proxy string, headers http.Header) error {
92+
proxyURL, err := url.Parse(proxy)
93+
if err != nil || proxyURL.Scheme == "" || proxyURL.Host == "" { // error or bad URL - try to use http as scheme to resolve
94+
proxyURL, err = url.Parse("http://" + proxy)
95+
if err != nil {
96+
return err
97+
}
98+
}
99+
if proxyURL.Hostname() == "" {
100+
return url.InvalidHostError(proxy)
101+
}
102+
103+
proxyTransport := &http.Transport{}
104+
if h.client.Transport != nil {
105+
transport, ok := h.client.Transport.(*http.Transport)
106+
if !ok {
107+
return fmt.Errorf("unable to coorce client transport as *http.Transport detected type is: %T", h.client.Transport)
108+
}
109+
proxyTransport = transport.Clone()
110+
}
111+
proxyTransport.Proxy = http.ProxyURL(proxyURL)
112+
proxyTransport.ProxyConnectHeader = headers
113+
h.client.Transport = proxyTransport
114+
return nil
115+
}
116+
87117
// Run starts the processing loop that will perform the HTTP request/response.
88118
// When there are no more messages to send Run will suspend until either there is
89119
// a new message to send or the polling interval elapses.

client/internal/httpsender_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net"
88
"net/http"
99
"net/http/httptest"
10+
"net/url"
1011
"sync"
1112
"sync/atomic"
1213
"testing"
@@ -356,3 +357,147 @@ func TestPackageUpdatesWithError(t *testing.T) {
356357

357358
cancel()
358359
}
360+
361+
func TestHTTPSenderSetProxy(t *testing.T) {
362+
tests := []struct {
363+
name string
364+
url string
365+
err error
366+
}{{
367+
name: "http proxy",
368+
url: "http://proxy.internal:8080",
369+
err: nil,
370+
}, {
371+
name: "socks5 proxy",
372+
url: "socks5://proxy.internal:8080",
373+
err: nil,
374+
}, {
375+
name: "no schema",
376+
url: "proxy.internal:8080",
377+
err: nil,
378+
}, {
379+
name: "empty url",
380+
url: "",
381+
err: url.InvalidHostError(""),
382+
}, {
383+
name: "invalid url",
384+
url: "this is not valid",
385+
err: url.InvalidHostError("this is not valid"),
386+
}}
387+
for _, tc := range tests {
388+
t.Run(tc.name, func(t *testing.T) {
389+
sender := NewHTTPSender(&sharedinternal.NopLogger{})
390+
err := sender.SetProxy(tc.url, nil)
391+
if tc.err != nil {
392+
assert.ErrorAs(t, err, &tc.err)
393+
} else {
394+
assert.NoError(t, err)
395+
}
396+
})
397+
}
398+
399+
t.Run("old transport settings are preserved", func(t *testing.T) {
400+
sender := &HTTPSender{
401+
client: &http.Client{
402+
Transport: &http.Transport{
403+
MaxResponseHeaderBytes: 1024,
404+
},
405+
},
406+
}
407+
err := sender.SetProxy("https://proxy.internal:8080", nil)
408+
assert.NoError(t, err)
409+
transport, ok := sender.client.Transport.(*http.Transport)
410+
if !ok {
411+
t.Logf("Transport: %v", sender.client.Transport)
412+
t.Fatalf("Unable to coorce as *http.Transport detected type: %T", sender.client.Transport)
413+
}
414+
assert.NotNil(t, transport.Proxy)
415+
assert.Equal(t, int64(1024), transport.MaxResponseHeaderBytes)
416+
})
417+
418+
t.Run("test https proxy", func(t *testing.T) {
419+
var connected atomic.Bool
420+
// HTTPS Connect proxy, no auth required
421+
proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
422+
t.Logf("Request: %+v", req)
423+
if req.Method != http.MethodConnect {
424+
w.WriteHeader(http.StatusMethodNotAllowed)
425+
return
426+
}
427+
connected.Store(true)
428+
429+
targetConn, err := net.DialTimeout("tcp", req.Host, 10*time.Second)
430+
if err != nil {
431+
w.WriteHeader(http.StatusBadGateway)
432+
return
433+
}
434+
defer targetConn.Close()
435+
436+
hijacker, ok := w.(http.Hijacker)
437+
if !ok {
438+
w.WriteHeader(http.StatusBadGateway)
439+
return
440+
}
441+
clientConn, _, err := hijacker.Hijack()
442+
if err != nil {
443+
t.Logf("Hijack error: %v", err)
444+
w.WriteHeader(http.StatusBadGateway)
445+
return
446+
}
447+
clientConn.Write([]byte("HTTP/1.1 200 Connection established\r\n\r\n"))
448+
defer clientConn.Close()
449+
450+
var wg sync.WaitGroup
451+
wg.Add(2)
452+
go func() {
453+
defer wg.Done()
454+
_, err := io.Copy(targetConn, clientConn)
455+
assert.NoError(t, err, "proxy encountered an error copying to destination")
456+
}()
457+
go func() {
458+
defer wg.Done()
459+
_, err := io.Copy(clientConn, targetConn)
460+
assert.NoError(t, err, "proxy encountered an error copying to client")
461+
}()
462+
wg.Wait()
463+
}))
464+
t.Cleanup(proxyServer.Close)
465+
466+
srv := StartTLSMockServer(t)
467+
t.Cleanup(srv.Close)
468+
srv.OnRequest = func(w http.ResponseWriter, _ *http.Request) {
469+
w.WriteHeader(http.StatusOK)
470+
}
471+
472+
sender := NewHTTPSender(&sharedinternal.NopLogger{})
473+
sender.client = proxyServer.Client()
474+
err := sender.SetProxy(proxyServer.URL, http.Header{"test-header": []string{"test-value"}})
475+
assert.NoError(t, err)
476+
477+
t.Logf("Proxy URL: %s", proxyServer.URL)
478+
479+
sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
480+
msg.AgentDescription = &protobufs.AgentDescription{
481+
IdentifyingAttributes: []*protobufs.KeyValue{{
482+
Key: "service.name",
483+
Value: &protobufs.AnyValue{
484+
Value: &protobufs.AnyValue_StringValue{StringValue: "test-service"},
485+
},
486+
}},
487+
}
488+
})
489+
sender.callbacks = types.Callbacks{
490+
OnConnect: func(_ context.Context) {
491+
},
492+
OnConnectFailed: func(_ context.Context, err error) {
493+
t.Logf("sender failed to connect: %v", err)
494+
},
495+
}
496+
sender.url = "https://" + srv.Endpoint
497+
498+
resp, err := sender.sendRequestWithRetries(context.Background())
499+
assert.NoError(t, err)
500+
assert.Equal(t, http.StatusOK, resp.StatusCode)
501+
assert.True(t, connected.Load(), "test request did not use proxy")
502+
})
503+
}

client/types/startsettings.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ type StartSettings struct {
2626
// Optional TLS config for HTTP connection.
2727
TLSConfig *tls.Config
2828

29+
// Optional Proxy configuration
30+
// The ProxyURL may be http(s) or socks5; if no schema is specified http is assumed.
31+
ProxyURL string
32+
// ProxyHeaders gives the headers an HTTP client will present on a proxy CONNECT request.
33+
ProxyHeaders http.Header
34+
2935
// Agent information.
3036
InstanceUid InstanceUid
3137

client/wsclient.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ package client
22

33
import (
44
"context"
5+
"crypto/tls"
56
"errors"
7+
"net"
68
"net/http"
79
"net/url"
10+
"strings"
811
"sync"
912
"sync/atomic"
1013
"time"
1114

1215
"github.com/cenkalti/backoff/v4"
1316
"github.com/gorilla/websocket"
17+
dialer "github.com/michel-laterman/proxy-connect-dialer-go"
1418

1519
"github.com/open-telemetry/opamp-go/client/internal"
1620
"github.com/open-telemetry/opamp-go/client/types"
@@ -81,6 +85,12 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro
8185
// Prepare connection settings.
8286
c.dialer = *websocket.DefaultDialer
8387

88+
if settings.ProxyURL != "" {
89+
if err := c.useProxy(settings.ProxyURL, settings.ProxyHeaders, settings.TLSConfig); err != nil {
90+
return err
91+
}
92+
}
93+
8494
var err error
8595
c.url, err = url.Parse(settings.OpAMPServerURL)
8696
if err != nil {
@@ -426,3 +436,52 @@ func (c *wsClient) runUntilStopped(ctx context.Context) {
426436
c.runOneCycle(ctx)
427437
}
428438
}
439+
440+
// useProxy sets the websocket dialer to use the passed proxy URL.
441+
// If the proxy has no schema http is used.
442+
// This method is not thread safe and must be called before c.dialer is used.
443+
func (c *wsClient) useProxy(proxy string, headers http.Header, cfg *tls.Config) error {
444+
proxyURL, err := url.Parse(proxy)
445+
if err != nil || proxyURL.Scheme == "" || proxyURL.Host == "" { // error or bad URL - try to use http as scheme to resolve
446+
proxyURL, err = url.Parse("http://" + proxy)
447+
if err != nil {
448+
return err
449+
}
450+
}
451+
if proxyURL.Hostname() == "" {
452+
return url.InvalidHostError(proxy)
453+
}
454+
455+
// Clear previous settings
456+
c.dialer.Proxy = nil
457+
c.dialer.NetDialContext = nil
458+
c.dialer.NetDialTLSContext = nil
459+
460+
switch strings.ToLower(proxyURL.Scheme) {
461+
case "http":
462+
// FIXME: dialer.NetDialContext is currently used as a work around instead of setting dialer.Proxy as gorilla/websockets does not have 1st class support for setting proxy connect headers
463+
// Once http://github.com/gorilla/websocket/issues/479 is complete, we should use dialer.Proxy, and dialer.ProxyConnectHeader
464+
if len(headers) > 0 {
465+
dialer, err := dialer.NewProxyConnectDialer(proxyURL, &net.Dialer{}, dialer.WithProxyConnectHeaders(headers))
466+
if err != nil {
467+
return err
468+
}
469+
c.dialer.NetDialContext = dialer.DialContext
470+
return nil
471+
}
472+
c.dialer.Proxy = http.ProxyURL(proxyURL) // No connect headers, use a regular proxy
473+
case "https":
474+
if len(headers) > 0 {
475+
dialer, err := dialer.NewProxyConnectDialer(proxyURL, &net.Dialer{}, dialer.WithTLS(cfg), dialer.WithProxyConnectHeaders(headers))
476+
if err != nil {
477+
return err
478+
}
479+
c.dialer.NetDialTLSContext = dialer.DialContext
480+
return nil
481+
}
482+
c.dialer.Proxy = http.ProxyURL(proxyURL) // No connect headers, use a regular proxy
483+
default: // catches socks5
484+
c.dialer.Proxy = http.ProxyURL(proxyURL)
485+
}
486+
return nil
487+
}

0 commit comments

Comments
 (0)