Skip to content

Commit 6eeba91

Browse files
Add integration test for upgrades using download proxy
1 parent 9548faf commit 6eeba91

File tree

3 files changed

+321
-4
lines changed

3 files changed

+321
-4
lines changed

testing/integration/ess/proxy_url_test.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"context"
1111
"crypto/tls"
1212
"crypto/x509"
13+
"errors"
1314
"net"
15+
"net/http"
1416
"net/url"
1517
"os"
1618
"path/filepath"
@@ -24,14 +26,19 @@ import (
2426
"github.com/stretchr/testify/require"
2527
"gopkg.in/yaml.v2"
2628

29+
"github.com/elastic/elastic-agent-libs/kibana"
2730
"github.com/elastic/elastic-agent-libs/testing/certutil"
31+
atesting "github.com/elastic/elastic-agent/pkg/testing"
2832
integrationtest "github.com/elastic/elastic-agent/pkg/testing"
2933
"github.com/elastic/elastic-agent/pkg/testing/define"
3034
"github.com/elastic/elastic-agent/pkg/testing/tools/check"
35+
"github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
3136
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
37+
"github.com/elastic/elastic-agent/pkg/version"
3238
"github.com/elastic/elastic-agent/testing/fleetservertest"
3339
"github.com/elastic/elastic-agent/testing/integration"
3440
"github.com/elastic/elastic-agent/testing/proxytest"
41+
"github.com/elastic/elastic-agent/testing/upgradetest"
3542
)
3643

3744
func TestProxyURL(t *testing.T) {
@@ -792,3 +799,234 @@ func createBasicFleetPolicyData(t *testing.T, fleetHost string) (fleetservertest
792799
}
793800
return apiKey, policyData
794801
}
802+
803+
// TestFleetDownloadProxyURL will test that the download proxy is used correctly for an agent upgrade.
804+
//
805+
// Test will target a download source that requires a proxy to be used.
806+
//
807+
// elastic-agent -> proxy -> artifacts-proxy -> upstream
808+
//
809+
// If a proxy is not used, the artifacts-proxy returns an status error code.
810+
// This test will do the following:
811+
// 1. Create special artifacts-proxy that requires a proxy header in order to serve artifacts
812+
// 2. Create a policy with no proxies - that has the artifacts-proxy as the download url
813+
// 3. Enroll an agent
814+
// 4. Upgrade the agent
815+
// 5. Ensure upgrade fails
816+
// 6. Update policy to use a download proxy
817+
// 7. Upgrade the agent
818+
// 8. Ensure upgrade succeeds
819+
func TestFleetDownloadProxyURL(t *testing.T) {
820+
ctx := t.Context()
821+
info := define.Require(t, define.Requirements{
822+
Group: integration.Fleet,
823+
Stack: &define.Stack{},
824+
Local: false,
825+
Sudo: true,
826+
})
827+
kibClient := info.KibanaClient
828+
fleetServerURL, err := fleettools.DefaultURL(ctx, kibClient)
829+
require.NoError(t, err)
830+
testUUID, err := uuid.NewV4()
831+
require.NoError(t, err, "error generating UUID for test")
832+
833+
artifactsProxy := proxytest.New(t,
834+
proxytest.WithVerifyRequest(func(r *http.Request) error { // ensure we have proxy header
835+
h := r.Header.Get("Forwarded")
836+
if h == "" {
837+
h = r.Header.Get("X-Forwarded-For")
838+
if h == "" {
839+
return errors.New("missing proxy header")
840+
}
841+
}
842+
return nil
843+
}),
844+
proxytest.WithRewriteFn(func(u *url.URL) { // Send requests to real upstream source
845+
u.Scheme = "https"
846+
u.Host = "snapshots.elastic.co"
847+
}),
848+
proxytest.WithRequestLog("artifacts", t.Logf),
849+
proxytest.WithVerboseLog())
850+
err = artifactsProxy.Start()
851+
require.NoError(t, err, "Error starting artifacts proxy")
852+
t.Cleanup(artifactsProxy.Close)
853+
854+
downloadSource := kibana.DownloadSource{
855+
Name: "LocalArtifactsProxy-" + testUUID.String(),
856+
Host: artifactsProxy.LocalhostURL + "/downloads/",
857+
}
858+
sourceResp, err := kibClient.CreateDownloadSource(ctx, downloadSource)
859+
require.NoError(t, err, "Unable to create download source")
860+
861+
// Get and process start and end fixtures
862+
// TODO: Do we need FIPS aware tests?
863+
startFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
864+
require.NoError(t, err)
865+
err = startFixture.Prepare(ctx)
866+
require.NoError(t, err)
867+
startVersionInfo, err := startFixture.ExecVersion(ctx)
868+
require.NoError(t, err)
869+
startParsedVersion, err := version.ParseVersion(startVersionInfo.Binary.String())
870+
require.NoError(t, err)
871+
872+
endFixture, err := atesting.NewFixture(
873+
t,
874+
upgradetest.EnsureSnapshot(define.Version()),
875+
atesting.WithFetcher(atesting.ArtifactFetcher()),
876+
)
877+
require.NoError(t, err)
878+
err = endFixture.Prepare(ctx)
879+
require.NoError(t, err)
880+
endVersionInfo, err := endFixture.ExecVersion(ctx)
881+
require.NoError(t, err)
882+
883+
if startVersionInfo.Binary.String() == endVersionInfo.Binary.String() &&
884+
startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
885+
t.Skipf("Build under test is the same as the build from the artifacts repository (version: %s) [commit: %s]",
886+
startVersionInfo.Binary.String(), startVersionInfo.Binary.Commit)
887+
}
888+
if startVersionInfo.Binary.Commit == endVersionInfo.Binary.Commit {
889+
t.Skipf("Target version has the same commit hash %q", endVersionInfo.Binary.Commit)
890+
}
891+
892+
t.Log("Creating Agent policy...")
893+
policy := kibana.AgentPolicy{
894+
Name: "test-policy-" + testUUID.String(),
895+
Namespace: "default",
896+
Description: "Test policy " + testUUID.String(),
897+
MonitoringEnabled: []kibana.MonitoringEnabledOption{
898+
kibana.MonitoringEnabledLogs,
899+
kibana.MonitoringEnabledMetrics,
900+
},
901+
DownloadSourceID: sourceResp.Item.ID, // Use artifacts-proxy as download source
902+
AdvancedSettings: map[string]interface{}{
903+
"agent_download_timeout": "1m", // Force a fast initial failure timeout
904+
},
905+
}
906+
policyResp, err := kibClient.CreatePolicy(ctx, policy)
907+
require.NoError(t, err)
908+
enrollmentToken, err := kibClient.CreateEnrollmentAPIKey(ctx, kibana.CreateEnrollmentAPIKeyRequest{
909+
PolicyID: policyResp.ID,
910+
})
911+
require.NoError(t, err)
912+
913+
t.Log("Installing Elastic Agent...")
914+
installOpts := atesting.InstallOpts{
915+
Force: true,
916+
EnrollOpts: atesting.EnrollOpts{
917+
URL: fleetServerURL,
918+
EnrollmentToken: enrollmentToken.APIKey,
919+
},
920+
}
921+
output, err := startFixture.Install(ctx, &installOpts)
922+
t.Logf("Install agent output:\n%s", string(output))
923+
require.NoError(t, err)
924+
925+
// TODO fast watcher config?
926+
927+
t.Log("Waiting for Agent to be correct version and healthy...")
928+
err = upgradetest.WaitHealthyAndVersion(ctx, startFixture, startVersionInfo.Binary, 2*time.Minute, 10*time.Second, t)
929+
require.NoError(t, err)
930+
931+
agentID, err := startFixture.AgentID(ctx)
932+
require.NoError(t, err)
933+
t.Logf("Agent ID: %q", agentID)
934+
935+
t.Log("Waiting for enrolled Agent status to be online...")
936+
require.Eventually(t, func() bool {
937+
return check.FleetAgentStatus(ctx, t, kibClient, agentID, "online")()
938+
}, time.Minute*2, time.Second, "Agent did not come online")
939+
940+
t.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\", without proxy...",
941+
startParsedVersion, startVersionInfo.Binary.Commit,
942+
endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)
943+
err = fleettools.UpgradeAgent(ctx, kibClient, agentID, endVersionInfo.Binary.String(), true)
944+
require.NoError(t, err)
945+
946+
t.Log("Ensure upgrade has failed")
947+
require.EventuallyWithT(t, func(c *assert.CollectT) {
948+
agent, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
949+
require.NoError(c, err)
950+
require.NotNil(c, agent.UpgradeDetails)
951+
require.Equal(c, "UPG_FAILED", agent.UpgradeDetails.State)
952+
}, time.Minute*5, time.Second, "Unable to verify that upgrade has failed.")
953+
954+
proxy := proxytest.New(t,
955+
proxytest.WithRequestLog("proxy", t.Logf),
956+
proxytest.WithVerboseLog())
957+
err = proxy.Start()
958+
require.NoError(t, err, "error starting download proxy")
959+
t.Cleanup(proxy.Close)
960+
961+
fleetProxyResp, err := kibClient.CreateFleetProxy(ctx, kibana.ProxiesRequest{
962+
Name: "fleet-upgrade-test-proxy-" + testUUID.String(),
963+
URL: proxy.LocalhostURL,
964+
})
965+
require.NoError(t, err)
966+
967+
// Update download source to include download proxy
968+
downloadSource.ProxyID = fleetProxyResp.Item.ID
969+
_, err = kibClient.UpdateDownloadSource(ctx, sourceResp.Item.ID, downloadSource)
970+
require.NoError(t, err, "Unable to update policy with download source proxy")
971+
// Update policy to increase download timeout to 60m (default 120m)
972+
updatedPolicy, err := kibClient.UpdatePolicy(ctx, policyResp.ID, kibana.AgentPolicyUpdateRequest{
973+
Name: policy.Name,
974+
Namespace: policy.Namespace,
975+
//DownloadSourceID: sourceResp.Item.ID,
976+
AdvancedSettings: map[string]interface{}{
977+
"agent_download_timeout": "60m",
978+
},
979+
})
980+
require.NoError(t, err, "Unable to update policy with new download timeout")
981+
982+
t.Logf("Verify agent is online and has updated to revision %d", updatedPolicy.Revision)
983+
require.EventuallyWithT(t, func(c *assert.CollectT) {
984+
agentResp, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
985+
require.NoError(c, err)
986+
require.Equal(c, "online", agentResp.Status)
987+
require.Equal(c, updatedPolicy.Revision, agentResp.PolicyRevision)
988+
}, time.Minute, time.Second, "Expected agent to be online and policy has updated")
989+
990+
t.Logf("Upgrading from version \"%s-%s\" to version \"%s-%s\", with proxy...",
991+
startParsedVersion, startVersionInfo.Binary.Commit,
992+
endVersionInfo.Binary.String(), endVersionInfo.Binary.Commit)
993+
err = fleettools.UpgradeAgent(ctx, kibClient, agentID, endVersionInfo.Binary.String(), true)
994+
require.NoError(t, err)
995+
996+
t.Log("Ensure upgrade starts")
997+
require.EventuallyWithT(t, func(c *assert.CollectT) {
998+
agent, err := kibClient.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
999+
require.NoError(c, err)
1000+
require.NotNil(c, agent.UpgradeDetails)
1001+
}, time.Minute*5, time.Second, "Unable to verify that upgrade details appear.")
1002+
1003+
t.Log("Waiting for upgrade watcher to start...")
1004+
err = upgradetest.WaitForWatcher(ctx, 5*time.Minute, 10*time.Second)
1005+
require.NoError(t, err)
1006+
t.Log("Upgrade watcher started")
1007+
1008+
err = upgradetest.WaitHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary, 2*time.Minute, 10*time.Second, t)
1009+
require.NoError(t, err)
1010+
1011+
t.Log("Waiting for upgraded Agent status to be online...")
1012+
require.Eventually(t, func() bool {
1013+
return check.FleetAgentStatus(ctx, t, kibClient, agentID, "online")()
1014+
}, time.Minute*10, time.Second*10, "Agent did not come online")
1015+
1016+
t.Log("Check agent version")
1017+
require.EventuallyWithT(t, func(c *assert.CollectT) {
1018+
ver, err := fleettools.GetAgentVersion(ctx, kibClient, agentID)
1019+
require.NoError(c, err)
1020+
require.Equal(c, endVersionInfo.Binary.Version, ver)
1021+
}, time.Minute*5, time.Second)
1022+
1023+
t.Log("Waiting for upgrade watcher to finish...")
1024+
err = upgradetest.WaitForNoWatcher(ctx, 2*time.Minute, 10*time.Second, 1*time.Minute+15*time.Second)
1025+
require.NoError(t, err)
1026+
1027+
err = upgradetest.CheckHealthyAndVersion(ctx, startFixture, endVersionInfo.Binary)
1028+
require.NoError(t, err, "Post watcher check has failed, agent may have rolled back")
1029+
1030+
require.NotEmpty(t, artifactsProxy.ProxiedRequests(), "artifactsProxy does not have any requests")
1031+
require.NotEmpty(t, proxy.ProxiedRequests(), "proxy does not have any requests")
1032+
}

testing/proxytest/proxytest.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ type Proxy struct {
5050
type Option func(o *options)
5151

5252
type options struct {
53-
addr string
54-
rewriteHost func(string) string
55-
rewriteURL func(u *url.URL)
53+
addr string
54+
rewriteHost func(string) string
55+
rewriteURL func(u *url.URL)
56+
verifyRequest func(r *http.Request) error
5657
// logFn if set will be used to log every request.
5758
logFn func(format string, a ...any)
5859
verbose bool
@@ -121,6 +122,14 @@ func WithRewriteFn(f func(u *url.URL)) Option {
121122
}
122123
}
123124

125+
// WithVerifyRequest calls f on the request before the proxy request is made.
126+
// If the verification returns an error, the proxy reutrns an HTTP 500 (for http proxy), or HTTP 502 (https)
127+
func WithVerifyRequest(f func(r *http.Request) error) Option {
128+
return func(o *options) {
129+
o.verifyRequest = f
130+
}
131+
}
132+
124133
// WithServerTLSConfig sets the TLS config for the server.
125134
func WithServerTLSConfig(tc *tls.Config) Option {
126135
return func(o *options) {
@@ -288,6 +297,12 @@ func (p *Proxy) serveHTTP(w http.ResponseWriter, r *http.Request) {
288297
func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
289298
origURL := r.URL.String()
290299

300+
if p.opts.verifyRequest != nil {
301+
if err := p.opts.verifyRequest(r); err != nil {
302+
return nil, err
303+
}
304+
}
305+
291306
switch {
292307
case p.opts.rewriteURL != nil:
293308
p.opts.rewriteURL(r.URL)
@@ -312,6 +327,14 @@ func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
312327
// needed anyway, so remove it.
313328
r.RequestURI = ""
314329

330+
// Add a Forwarded header
331+
host, _, err := net.SplitHostPort(r.RemoteAddr)
332+
if err != nil {
333+
p.opts.logFn("[ERROR] could parse remote address: %v", err)
334+
} else {
335+
r.Header.Add("Forwarded", "for="+host)
336+
}
337+
315338
return p.client.Do(r)
316339
}
317340

0 commit comments

Comments
 (0)