Skip to content

Commit ed6157d

Browse files
Add integration test for upgrades using download proxy
1 parent 9548faf commit ed6157d

File tree

3 files changed

+320
-4
lines changed

3 files changed

+320
-4
lines changed

testing/integration/ess/proxy_url_test.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"context"
1111
"crypto/tls"
1212
"crypto/x509"
13+
"errors"
1314
"net"
15+
"net/http"
1416
"net/url"
1517
"os"
1618
"path/filepath"
@@ -24,14 +26,19 @@ import (
2426
"github.com/stretchr/testify/require"
2527
"gopkg.in/yaml.v2"
2628

29+
"github.com/elastic/elastic-agent-libs/kibana"
2730
"github.com/elastic/elastic-agent-libs/testing/certutil"
31+
atesting "github.com/elastic/elastic-agent/pkg/testing"
2832
integrationtest "github.com/elastic/elastic-agent/pkg/testing"
2933
"github.com/elastic/elastic-agent/pkg/testing/define"
3034
"github.com/elastic/elastic-agent/pkg/testing/tools/check"
35+
"github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
3136
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
37+
"github.com/elastic/elastic-agent/pkg/version"
3238
"github.com/elastic/elastic-agent/testing/fleetservertest"
3339
"github.com/elastic/elastic-agent/testing/integration"
3440
"github.com/elastic/elastic-agent/testing/proxytest"
41+
"github.com/elastic/elastic-agent/testing/upgradetest"
3542
)
3643

3744
func TestProxyURL(t *testing.T) {
@@ -792,3 +799,233 @@ func createBasicFleetPolicyData(t *testing.T, fleetHost string) (fleetservertest
792799
}
793800
return apiKey, policyData
794801
}
802+
803+
// TestFleetDownloadProxyURL will test that the download proxy is used correctly for an agent upgrade.
804+
//
805+
// Test will target a download source that requires a proxy to be used.
806+
//
807+
// elastic-agent -> proxy -> artifacts-proxy -> upstream
808+
//
809+
// If a proxy is not used, the artifacts-proxy returns an status error code.
810+
// This test will do the following:
811+
// 1. Create special artifacts-proxy that requires a proxy header in order to serve artifacts
812+
// 2. Create a policy with no proxies - that has the artifacts-proxy as the download url
813+
// 3. Enroll an agent
814+
// 4. Upgrade the agent
815+
// 5. Ensure upgrade fails
816+
// 6. Update policy to use a download proxy
817+
// 7. Upgrade the agent
818+
// 8. Ensure upgrade succeeds
819+
func TestFleetDownloadProxyURL(t *testing.T) {
820+
ctx := t.Context()
821+
info := define.Require(t, define.Requirements{
822+
Group: integration.Fleet,
823+
Stack: &define.Stack{},
824+
Local: false,
825+
Sudo: true,
826+
})
827+
kibClient := info.KibanaClient
828+
fleetServerURL, err := fleettools.DefaultURL(ctx, kibClient)
829+
require.NoError(t, err)
830+
testUUID, err := uuid.NewV4()
831+
require.NoError(t, err, "error generating UUID for test")
832+
833+
artifactsProxy := proxytest.New(t,
834+
proxytest.WithVerifyRequest(func(r *http.Request) error { // ensure we have proxy header
835+
h := r.Header.Get("Forwarded")
836+
if h == "" {
837+
h = r.Header.Get("X-Forwarded-For")
838+
if h == "" {
839+
return errors.New("missing proxy header")
840+
}
841+
}
842+
return nil
843+
}),
844+
proxytest.WithRewriteFn(func(u *url.URL) { // Send requests to real upstream source
845+
u.Scheme = "https"
846+
u.Host = "snapshots.elastic.co"
847+
}),
848+
proxytest.WithRequestLog("artifacts", t.Logf),
849+
proxytest.WithVerboseLog())
850+
err = artifactsProxy.Start()
851+
require.NoError(t, err, "Error starting artifacts proxy")
852+
t.Cleanup(artifactsProxy.Close)
853+
854+
downloadSource := kibana.DownloadSource{
855+
Name: "LocalArtifactsProxy-" + testUUID.String(),
856+
Host: artifactsProxy.LocalhostURL + "/downloads/",
857+
}
858+
sourceResp, err := kibClient.CreateDownloadSource(ctx, downloadSource)
859+
require.NoError(t, err, "Unable to create download source")
860+
861+
// Get and process start and end fixtures
862+
// TODO: Do we need FIPS aware tests?
863+
startFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
864+
require.NoError(t, err)
865+
err = startFixture.Prepare(ctx)
866+
require.NoError(t, err)
867+
startVersionInfo, err := startFixture.ExecVersion(ctx)
868+
require.NoError(t, err)
869+
startParsedVersion, err := version.ParseVersion(startVersionInfo.Binary.String())
870+
require.NoError(t, err)
871+
872+
endFixture, err := atesting.NewFixture(
873+
t,
874+
upgradetest.EnsureSnapshot(define.Version()),
875+
atesting.WithFetcher(atesting.ArtifactFetcher()),
876+
)
877+
require.NoError(t, err)
878+
err = endFixture.Prepare(ctx)
879+
require.NoError(t, err)
880+
endVersionInfo, err := endFixture.ExecVersion(ctx)
881+
require.NoError(t, err)
882+
883+
if startVersionInfo.Binary.String() == endVersionInfo.Binary.String() &&
884+
startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
885+
t.Skipf("Build under test is the same as the build from the artifacts repository (version: %s) [commit: %s]",
886+
startVersionInfo.Binary.String(), startVersionInfo.Binary.Commit)
887+
}
888+
if startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
889+
t.Skipf("Target version has the same commit hash %q", endVersionInfo.Binary.Commit)
890+
}
891+
892+
t.Log("Creating Agent policy...")
893+
policy := kibana.AgentPolicy{
894+
Name: "test-policy-" + testUUID.String(),
895+
Namespace: "default",
896+
Description: "Test policy " + testUUID.String(),
897+
MonitoringEnabled: []kibana.MonitoringEnabledOption{
898+
kibana.MonitoringEnabledLogs,
899+
kibana.MonitoringEnabledMetrics,
900+
},
901+
DownloadSourceID: sourceResp.Item.ID, // Use artifacts-proxy as download source
902+
AdvancedSettings: map[string]interface{}{
903+
"agent_download_timeout": "1m", // Force a fast initial failure timeout
904+
},
905+
}
906+
policyResp, err := kibClient.CreatePolicy(ctx, policy)
907+
require.NoError(t, err)
908+
enrollmentToken, err := kibClient.CreateEnrollmentAPIKey(ctx, kibana.CreateEnrollmentAPIKeyRequest{
909+
PolicyID: policyResp.ID,
910+
})
911+
require.NoError(t, err)
912+
913+
t.Log("Installing Elastic Agent...")
914+
installOpts := atesting.InstallOpts{
915+
Force: true,
916+
EnrollOpts: atesting.EnrollOpts{
917+
URL: fleetServerURL,
918+
EnrollmentToken: enrollmentToken.APIKey,
919+
},
920+
}
921+
output, err := startFixture.Install(ctx, &installOpts)
922+
t.Logf("Install agent output:\n%s", string(output))
923+
require.NoError(t, err)
924+
925+
// TODO fast watcher config?
926+
927+
t.Log("Waiting for Agent to be correct version and healthy...")
928+
err = upgradetest.WaitHealthyAndVersion(ctx, startFixture, startVersionInfo.Binary, 2*time.Minute, 10*time.Second, t)
929+
require.NoError(t, err)
930+
931+
agentID, err := startFixture.AgentID(ctx)
932+
require.NoError(t, err)
933+
t.Logf("Agent ID: %q", agentID)
934+
935+
t.Log("Waiting for enrolled Agent status to be online...")
936+
require.Eventually(t, func() bool {
937+
return check.FleetAgentStatus(ctx, t, kibClient, agentID, "online")()
938+
}, time.Minute*2, time.Second, "Agent did not come online")
939+
940+
t.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\", without proxy...",
941+
startParsedVersion, startVersionInfo.Binary.Commit,
942+
endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)
943+
err = fleettools.UpgradeAgent(ctx, kibClient, agentID, endVersionInfo.Binary.String(), true)
944+
require.NoError(t, err)
945+
946+
t.Log("Ensure upgrade has failed")
947+
require.EventuallyWithT(t, func(c *assert.CollectT) {
948+
agent, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
949+
require.NoError(c, err)
950+
require.NotNil(c, agent.UpgradeDetails)
951+
require.Equal(c, "UPG_FAILED", agent.UpgradeDetails.State)
952+
}, time.Minute*5, time.Second, "Unable to verify that upgrade has failed.")
953+
954+
proxy := proxytest.New(t,
955+
proxytest.WithRequestLog("proxy", t.Logf),
956+
proxytest.WithVerboseLog())
957+
err = proxy.Start()
958+
require.NoError(t, err, "error starting download proxy")
959+
t.Cleanup(proxy.Close)
960+
961+
fleetProxyResp, err := kibClient.CreateFleetProxy(ctx, kibana.ProxiesRequest{
962+
Name: "fleet-upgrade-test-proxy-" + testUUID.String(),
963+
URL: proxy.LocalhostURL,
964+
})
965+
require.NoError(t, err)
966+
967+
// Update download source to include download proxy
968+
downloadSource.ProxyID = fleetProxyResp.Item.ID
969+
_, err = kibClient.UpdateDownloadSource(ctx, sourceResp.Item.ID, downloadSource)
970+
require.NoError(t, err, "Unable to update policy with download source proxy")
971+
// Update policy to increase download timeout to 60m (default 120m)
972+
updatedPolicy, err := kibClient.UpdatePolicy(ctx, policyResp.ID, kibana.AgentPolicyUpdateRequest{
973+
Name: policy.Name,
974+
Namespace: policy.Namespace,
975+
AdvancedSettings: map[string]interface{}{
976+
"agent_download_timeout": "60m",
977+
},
978+
})
979+
require.NoError(t, err, "Unable to update policy with new download timeout")
980+
981+
t.Logf("Verify agent is online and has updated to revision %d", updatedPolicy.Revision)
982+
require.EventuallyWithT(t, func(c *assert.CollectT) {
983+
agentResp, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
984+
require.NoError(c, err)
985+
require.Equal(c, "online", agentResp.Status)
986+
require.Equal(c, updatedPolicy.Revision, agentResp.PolicyRevision)
987+
}, time.Minute, time.Second, "Expected agent to be online and policy has updated")
988+
989+
t.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\", with proxy...",
990+
startParsedVersion, startVersionInfo.Binary.Commit,
991+
endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)
992+
err = fleettools.UpgradeAgent(ctx, kibClient, agentID, endVersionInfo.Binary.String(), true)
993+
require.NoError(t, err)
994+
995+
t.Log("Ensure upgrade starts")
996+
require.EventuallyWithT(t, func(c *assert.CollectT) {
997+
agent, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
998+
require.NoError(c, err)
999+
require.NotNil(c, agent.UpgradeDetails)
1000+
}, time.Minute*5, time.Second, "Unable to verify that upgrade details appear.")
1001+
1002+
t.Log("Waiting for upgrade watcher to start...")
1003+
err = upgradetest.WaitForWatcher(ctx, 5*time.Minute, 10*time.Second)
1004+
require.NoError(t, err)
1005+
t.Log("Upgrade watcher started")
1006+
1007+
err = upgradetest.WaitHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary, 2*time.Minute, 10*time.Second, t)
1008+
require.NoError(t, err)
1009+
1010+
t.Log("Waiting for upgraded Agent status to be online...")
1011+
require.Eventually(t, func() bool {
1012+
return check.FleetAgentStatus(ctx, t, kibClient, agentID, "online")()
1013+
}, time.Minute*10, time.Second*10, "Agent did not come online")
1014+
1015+
t.Log("Check agent version")
1016+
require.EventuallyWithT(t, func(c *assert.CollectT) {
1017+
ver, err := fleettools.GetAgentVersion(ctx, kibClient, agentID)
1018+
require.NoError(c, err)
1019+
require.Equal(c, endVersionInfo.Binary.Version, ver)
1020+
}, time.Minute*5, time.Second)
1021+
1022+
t.Log("Waiting for upgrade watcher to finish...")
1023+
err = upgradetest.WaitForNoWatcher(ctx, 2*time.Minute, 10*time.Second, 1*time.Minute+15*time.Second)
1024+
require.NoError(t, err)
1025+
1026+
err = upgradetest.CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary)
1027+
require.NoError(t, err, "Post watcher check has failed, agent may have rolled back")
1028+
1029+
require.NotEmpty(t, artifactsProxy.ProxiedRequests(), "artifactsProxy does not have any requests")
1030+
require.NotEmpty(t, proxy.ProxiedRequests(), "proxy does not have any requests")
1031+
}

testing/proxytest/proxytest.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ type Proxy struct {
5050
type Option func(o *options)
5151

5252
type options struct {
53-
addr string
54-
rewriteHost func(string) string
55-
rewriteURL func(u *url.URL)
53+
addr string
54+
rewriteHost func(string) string
55+
rewriteURL func(u *url.URL)
56+
verifyRequest func(r *http.Request) error
5657
// logFn if set will be used to log every request.
5758
logFn func(format string, a ...any)
5859
verbose bool
@@ -121,6 +122,14 @@ func WithRewriteFn(f func(u *url.URL)) Option {
121122
}
122123
}
123124

125+
// WithVerifyRequest calls f on the request before the proxy request is made.
126+
// If the verification returns an error, the proxy reutrns an HTTP 500 (for http proxy), or HTTP 502 (https)
127+
func WithVerifyRequest(f func(r *http.Request) error) Option {
128+
return func(o *options) {
129+
o.verifyRequest = f
130+
}
131+
}
132+
124133
// WithServerTLSConfig sets the TLS config for the server.
125134
func WithServerTLSConfig(tc *tls.Config) Option {
126135
return func(o *options) {
@@ -288,6 +297,12 @@ func (p *Proxy) serveHTTP(w http.ResponseWriter, r *http.Request) {
288297
func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
289298
origURL := r.URL.String()
290299

300+
if p.opts.verifyRequest != nil {
301+
if err := p.opts.verifyRequest(r); err != nil {
302+
return nil, err
303+
}
304+
}
305+
291306
switch {
292307
case p.opts.rewriteURL != nil:
293308
p.opts.rewriteURL(r.URL)
@@ -312,6 +327,14 @@ func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
312327
// needed anyway, so remove it.
313328
r.RequestURI = ""
314329

330+
// Add a Forwarded header
331+
host, _, err := net.SplitHostPort(r.RemoteAddr)
332+
if err != nil {
333+
p.opts.logFn("[ERROR] could not parse remote address %q: %v", r.RemoteAddr, err)
334+
} else {
335+
r.Header.Add("Forwarded", "for="+host)
336+
}
337+
315338
return p.client.Do(r)
316339
}
317340

testing/proxytest/proxytest_test.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"crypto/tls"
1010
"crypto/x509"
11+
"errors"
1112
"io"
1213
"net"
1314
"net/http"
@@ -171,6 +172,56 @@ func TestProxy(t *testing.T) {
171172
}
172173
},
173174
},
175+
{
176+
name: "Basic scenario, request verifier passes",
177+
setup: setup{
178+
fakeBackendServer: createFakeBackendServer(),
179+
generateTestHttpClient: nil,
180+
},
181+
proxyOptions: []Option{WithVerifyRequest(func(r *http.Request) error {
182+
if r.Method == http.MethodGet {
183+
return nil
184+
}
185+
return errors.New("unsupported method")
186+
})},
187+
proxyStartTLS: false,
188+
request: testRequest{
189+
method: http.MethodGet,
190+
url: "http://somehost:1234/some/path/here",
191+
body: nil,
192+
},
193+
wantErr: assert.NoError,
194+
assertFunc: func(t *testing.T, proxy *Proxy, resp *http.Response) {
195+
assert.Equal(t, http.StatusOK, resp.StatusCode)
196+
if assert.NotEmpty(t, proxy.ProxiedRequests(), "proxy should have captured at least 1 request") {
197+
assert.Contains(t, proxy.ProxiedRequests()[0], "/some/path/here")
198+
}
199+
},
200+
},
201+
{
202+
name: "Basic scenario, request verifier fails",
203+
setup: setup{
204+
fakeBackendServer: createFakeBackendServer(),
205+
generateTestHttpClient: nil,
206+
},
207+
proxyOptions: []Option{WithVerifyRequest(func(r *http.Request) error {
208+
if r.Method == http.MethodPost {
209+
return nil
210+
}
211+
return errors.New("unsupported method")
212+
})},
213+
proxyStartTLS: false,
214+
request: testRequest{
215+
method: http.MethodGet,
216+
url: "http://somehost:1234/some/path/here",
217+
body: nil,
218+
},
219+
wantErr: assert.NoError,
220+
assertFunc: func(t *testing.T, proxy *Proxy, resp *http.Response) {
221+
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
222+
assert.Empty(t, proxy.ProxiedRequests(), "proxy does not capture unverified request")
223+
},
224+
},
174225
}
175226

176227
for _, tt := range testcases {
@@ -379,7 +430,12 @@ func prepareMTLSProxyAndTargetServer(t *testing.T, targetHost string) (*Proxy, h
379430

380431
func createFakeBackendServer() *httptest.Server {
381432
handlerF := func(writer http.ResponseWriter, request *http.Request) {
382-
// always return HTTP 200
433+
if h := request.Header.Get("Forwarded"); h == "" {
434+
// return 400 if the proxy has not added the Forwarded header.
435+
writer.WriteHeader(http.StatusBadRequest)
436+
return
437+
}
438+
// return HTTP 200 otherwise
383439
writer.WriteHeader(http.StatusOK)
384440
}
385441

0 commit comments

Comments
 (0)