diff --git a/NOTICE-fips.txt b/NOTICE-fips.txt index 877429180bc..d4655fa09bf 100644 --- a/NOTICE-fips.txt +++ b/NOTICE-fips.txt @@ -787,11 +787,11 @@ Contents of probable licence file $GOMODCACHE/github.com/dolmen-go/contextio@v0. -------------------------------------------------------------------------------- Dependency : github.com/elastic/beats/v7 -Version: v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 +Version: v7.0.0-alpha2.0.20251203173126-96dc1b39c163 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20251202130319-deec5c55e9b0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20251203173126-96dc1b39c163/LICENSE.txt: Source code in this repository is variously licensed under the Apache License Version 2.0, an Apache compatible license, or the Elastic License. Outside of diff --git a/NOTICE.txt b/NOTICE.txt index 7076e5ab05e..f407cf9d57f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -787,11 +787,11 @@ Contents of probable licence file $GOMODCACHE/github.com/dolmen-go/contextio@v0. -------------------------------------------------------------------------------- Dependency : github.com/elastic/beats/v7 -Version: v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 +Version: v7.0.0-alpha2.0.20251203173126-96dc1b39c163 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20251202130319-deec5c55e9b0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20251203173126-96dc1b39c163/LICENSE.txt: Source code in this repository is variously licensed under the Apache License Version 2.0, an Apache compatible license, or the Elastic License. Outside of diff --git a/dev-tools/notice/overrides.json b/dev-tools/notice/overrides.json index 52dfdb1bd0d..8ca6a30772a 100644 --- a/dev-tools/notice/overrides.json +++ b/dev-tools/notice/overrides.json @@ -1,4 +1,5 @@ {"name": "github.com/elastic/beats/v7", "licenceType": "Elastic"} +{"name": "github.com/belimawr/beats/v7", "licenceType": "Elastic"} {"name": "github.com/elastic/elastic-agent-client/v7", "licenceType": "Elastic"} {"name": "github.com/gorhill/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"} {"name": "github.com/hashicorp/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"} diff --git a/go.mod b/go.mod index 024b377584f..0677e82eeb1 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/docker/docker v28.5.1+incompatible github.com/docker/go-units v0.5.0 github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 - github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 + github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163 github.com/elastic/cloud-on-k8s/v2 v2.0.0-20250327073047-b624240832ae github.com/elastic/elastic-agent-autodiscover v0.10.0 github.com/elastic/elastic-agent-client/v7 v7.17.2 @@ -243,6 +243,7 @@ require ( github.com/bitfield/gotestdox v0.2.2 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/bmatcuk/doublestar/v4 v4.9.1 // indirect + github.com/brianvoe/gofakeit v3.18.0+incompatible // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/cilium/ebpf v0.19.0 // indirect diff --git a/go.sum b/go.sum index 327565833e2..7b55502aee3 100644 --- a/go.sum +++ b/go.sum @@ -503,8 +503,8 @@ github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumpti github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0-elastic/go.mod h1:0vCBR1wgGwZeGmloJ+eCWIZF2S47grTXRzj2mftg2Nk= github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0= github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 h1:NLqI2no2x3cSjcKH76C9ItRw0GiQVC7/uoJ/DLurEcE= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0/go.mod h1:jFJMJMOG18CdEbvM3UKQnn3Ft66j2vgMfStZwLAcK+A= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163 h1:U/zcMEPIsEhzXJ8PjEFyn5bgy4GJKIFuTQ/MVr9Zry4= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163/go.mod h1:jFJMJMOG18CdEbvM3UKQnn3Ft66j2vgMfStZwLAcK+A= github.com/elastic/cloud-on-k8s/v2 v2.0.0-20250327073047-b624240832ae h1:OiShmbWAyGU0MS0ADJWr1/QgeLIZliMk9xsrFicR3/s= github.com/elastic/cloud-on-k8s/v2 v2.0.0-20250327073047-b624240832ae/go.mod h1:D2IckZVXARugvikE4fv1glvaJMohKSZRzrPsxCjo9O0= github.com/elastic/elastic-agent-autodiscover v0.10.0 h1:WJ4zl9uSfk1kHmn2B/0byQBLIL607Zt4LNfOgV7+XN0= diff --git a/internal/edot/go.mod b/internal/edot/go.mod index e92b6d65365..53e9d8b482b 100644 --- a/internal/edot/go.mod +++ b/internal/edot/go.mod @@ -5,7 +5,7 @@ go 1.24.11 replace github.com/elastic/elastic-agent => ../../ require ( - github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 + github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163 github.com/elastic/elastic-agent v0.0.0-00010101000000-000000000000 github.com/elastic/elastic-agent-libs v0.26.2 github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector v0.20.0 diff --git a/internal/edot/go.sum b/internal/edot/go.sum index 24edec95bf9..71bf843e53d 100644 --- a/internal/edot/go.sum +++ b/internal/edot/go.sum @@ -432,8 +432,8 @@ github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumpti github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0-elastic/go.mod h1:0vCBR1wgGwZeGmloJ+eCWIZF2S47grTXRzj2mftg2Nk= github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0= github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0 h1:NLqI2no2x3cSjcKH76C9ItRw0GiQVC7/uoJ/DLurEcE= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251202130319-deec5c55e9b0/go.mod h1:jFJMJMOG18CdEbvM3UKQnn3Ft66j2vgMfStZwLAcK+A= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163 h1:U/zcMEPIsEhzXJ8PjEFyn5bgy4GJKIFuTQ/MVr9Zry4= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20251203173126-96dc1b39c163/go.mod h1:jFJMJMOG18CdEbvM3UKQnn3Ft66j2vgMfStZwLAcK+A= github.com/elastic/elastic-agent-autodiscover v0.10.0 h1:WJ4zl9uSfk1kHmn2B/0byQBLIL607Zt4LNfOgV7+XN0= github.com/elastic/elastic-agent-autodiscover v0.10.0/go.mod h1:Nf3zh9FcJ9nTTswTwDTUAqXmvQllOrNliM6xmORSxwE= github.com/elastic/elastic-agent-client/v7 v7.17.2 h1:Cl2TeABqWZgW40t5fchGWT/sRk4MDDLWA0d8iHHOxLA= diff --git a/magefile.go b/magefile.go index 0321e1620d6..5475ec0c63c 100644 --- a/magefile.go +++ b/magefile.go @@ -2301,7 +2301,6 @@ func (Integration) Local(ctx context.Context, testName string) error { params.Packages = []string{ "github.com/elastic/elastic-agent/testing/integration/...", } - var goTestFlags []string rawTestFlags := os.Getenv("GOTEST_FLAGS") if rawTestFlags != "" { diff --git a/pkg/testing/linux/linux.go b/pkg/testing/linux/linux.go index c38f53dc521..4ee388db81f 100644 --- a/pkg/testing/linux/linux.go +++ b/pkg/testing/linux/linux.go @@ -81,7 +81,7 @@ func linuxCopy(ctx context.Context, sshClient ssh.SSHClient, logger common.Logge err, stdout, stderr) } - stdOut, errOut, err := sshClient.Exec(ctx, "unzip", []string{destRepoName, "-d", "agent"}, nil) + stdOut, errOut, err := sshClient.Exec(ctx, "unzip", []string{"-o", destRepoName, "-d", "agent"}, nil) if err != nil { return fmt.Errorf("failed to unzip %s to agent directory: %w (stdout: %s, stderr: %s)", destRepoName, err, stdOut, errOut) } diff --git a/testing/integration/common.go b/testing/integration/common.go index 655fe4eb04c..79663756993 100644 --- a/testing/integration/common.go +++ b/testing/integration/common.go @@ -7,6 +7,7 @@ package integration import ( + "errors" "fmt" "net/url" "os" @@ -21,6 +22,9 @@ import ( func GetESHost() (string, error) { fixedESHost := os.Getenv("ELASTICSEARCH_HOST") + if len(fixedESHost) == 0 { + return "", errors.New("ELASTICSEARCH_HOST cannot be empty") + } parsedES, err := url.Parse(fixedESHost) if err != nil { return "", err diff --git a/testing/integration/ess/beat_receivers_test.go b/testing/integration/ess/beat_receivers_test.go index d8f2c9c42af..1abb2e49412 100644 --- a/testing/integration/ess/beat_receivers_test.go +++ b/testing/integration/ess/beat_receivers_test.go @@ -188,9 +188,7 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { require.NoError(t, err, "error reading policy response") defer resp.Body.Close() - apiKeyResponse, err := createESApiKey(info.ESClient) - require.NoError(t, err, "failed to get api key") - require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse) + apiKeyResponse := createESApiKey(t, info.ESClient) apiKey, err := getDecodedApiKey(apiKeyResponse) require.NoError(t, err, "error decoding api key") @@ -464,9 +462,7 @@ outputs: esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) require.NoError(t, err, "error decoding api key") @@ -1203,9 +1199,7 @@ func TestSensitiveLogsESExporter(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) decodedApiKey, err := getDecodedApiKey(esApiKey) require.NoError(t, err) @@ -1386,9 +1380,7 @@ func TestSensitiveIncludeSourceOnError(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) decodedApiKey, err := getDecodedApiKey(esApiKey) require.NoError(t, err) diff --git a/testing/integration/ess/logfile.go b/testing/integration/ess/logfile.go new file mode 100644 index 00000000000..6840fc0e74b --- /dev/null +++ b/testing/integration/ess/logfile.go @@ -0,0 +1,160 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package ess + +import ( + "bufio" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// LogFile wraps a *os.File and makes it more suitable for tests. +// Key features: +// - On failures, the file is kept and its path printed +// - Methods to search and wait for substrings in lines are provided, +// they keep track of the offset, ensuring ordering when +// when searching. +type LogFile struct { + *os.File + offset int64 + KeepLogFileOnSuccess bool +} + +// NewLogFile returns a new LogFile, path must be the components of a path, +// they will be joined using the OS path separator. +// If path is not provided, os.TempDir is used as the base path for the file. +func NewLogFile(t testing.TB, path ...string) *LogFile { + dir := filepath.Join(path...) + if dir == "" { + dir = os.TempDir() + } + + if err := os.MkdirAll(dir, 0o750); err != nil { + t.Fatalf("cannot create folder for logs: %s", err) + } + + f, err := os.CreateTemp(dir, "elastic-agent-*.ndjson") + if err != nil { + t.Fatalf("cannot create log file: %s", err) + } + + lf := &LogFile{ + File: f, + } + + t.Cleanup(func() { + if err := f.Sync(); err != nil { + t.Logf("cannot sync log file: %s", err) + } + + if err := f.Close(); err != nil { + t.Logf("cannot close log file: %s", err) + } + + // If the test failed, print the log file location, + // otherwise remove it. + if t.Failed() || lf.KeepLogFileOnSuccess { + t.Logf("Full logs written to %s", f.Name()) + return + } + + if err := os.Remove(f.Name()); err != nil { + t.Logf("could not remove temporary log file: %s", err) + } + }) + + return lf +} + +// WaitLogsContains waits for the specified string s to be present in the logs within +// the given timeout duration and fails the test if s is not found. +// It keeps track of the log file offset, reading only new lines. Each +// subsequent call to WaitLogsContains will only check logs not yet evaluated. +// msgAndArgs should be a format string and arguments that will be printed +// if the logs are not found, providing additional context for debugging. +func (l *LogFile) WaitLogsContains(t testing.TB, s string, timeout time.Duration, msgAndArgs ...any) { + t.Helper() + require.EventuallyWithT( + t, + func(c *assert.CollectT) { + found, err := l.FindInLogs(s) + if err != nil { + c.Errorf("cannot check the log file: %s", err) + return + } + + if !found { + c.Errorf("did not find '%s' in the logs", s) + } + }, + timeout, + 100*time.Millisecond, + msgAndArgs...) +} + +// LogContains searches for str in the log file keeping track of the +// offset. If there is any issue reading the log file, then t.Fatalf is called, +// if str is not present in the logs, t.Errorf is called. +func (l *LogFile) LogContains(t testing.TB, str string) { + t.Helper() + found, err := l.FindInLogs(str) + if err != nil { + t.Fatalf("cannot read log file: %s", err) + } + + if !found { + t.Errorf("'%s' not found in logs", str) + } +} + +// FindInLogs searches for str in the log file keeping track of the offset. +// It returns true if str is found in the logs. If there are any errors, +// it returns false and the error +func (l *LogFile) FindInLogs(str string) (bool, error) { + // Open the file again so we can seek and not interfere with + // the logger writing to it. + f, err := os.Open(l.Name()) + if err != nil { + return false, fmt.Errorf("cannot open log file for reading: %w", err) + } + + if _, err := f.Seek(l.offset, io.SeekStart); err != nil { + return false, fmt.Errorf("cannot seek log file: %w", err) + } + + r := bufio.NewReader(f) + for { + data, err := r.ReadBytes('\n') + line := string(data) + l.offset += int64(len(data)) + + if err != nil { + if !errors.Is(err, io.EOF) { + return false, fmt.Errorf("error reading log file '%s': %w", l.Name(), err) + } + break + } + + if strings.Contains(line, str) { + return true, nil + } + } + + return false, nil +} + +// ResetOffset resets the log file offset +func (l *LogFile) ResetOffset() { + l.offset = 0 +} diff --git a/testing/integration/ess/otel_log_as_filestream_test.go b/testing/integration/ess/otel_log_as_filestream_test.go new file mode 100644 index 00000000000..16c3b6f72a1 --- /dev/null +++ b/testing/integration/ess/otel_log_as_filestream_test.go @@ -0,0 +1,287 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build integration + +package ess + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + libbeattesting "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/elastic-agent/pkg/core/process" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" + "github.com/elastic/elastic-agent/testing/integration" +) + +// TestFilebeatReceiverLogAsFilestream test beats receivers as follow: +// 1. Runs Filebeat Receiver with the Log input +// 2. Ensures all events are ingested +// 3. Stops Filebeat Receiver +// 4. Starts Filebeat Receiver with the global feature flag enabled +// 5. Adds more data to the file +// 6. Ensures all data is ingested and no duplication happens +func TestFilebeatReceiverLogAsFilestream(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + }) + + otelConfigTemplate := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: log + id: a-unique-id + allow_deprecated_use: true + paths: + {{.LogFilepath}} + fields: + find_me: {{.Namespace}} + output: + otelconsumer: + logging: + level: debug + selectors: + - '*' + path.home: {{.HomeDir}} + features.log_input_run_as_filestream.enabled: {{.AsFilestream}} + +exporters: + elasticsearch: + api_key: {{.ESApiKey}} + endpoint: {{.ESEndpoint}} + logs_index: {{.Namespace}} + sending_queue: + enabled: true + wait_for_result: true # Avoid losing data on shutdown + block_on_overflow: true + mapping: + mode: none + +service: + pipelines: + logs: + receivers: [filebeatreceiver] + exporters: [elasticsearch] + telemetry: + logs: + level: DEBUG + encoding: json + disable_stacktrace: true +` + + waitEventsInES := func(want int) { + t.Helper() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 5*time.Second) + defer findCancel() + + docs, err := estools.GetAllLogsForIndexWithContext( + findCtx, + info.ESClient, + info.Namespace) + require.NoError(c, err) + + got := docs.Hits.Total.Value + require.Equalf( + c, + want, + got, + "expecting %d events, got %d", + want, + got) + }, 60*time.Second, time.Second, "did not find the expected number of events") + } + + rootDir, err := filepath.Abs(filepath.Join("..", "..", "..", "build")) + require.NoError(t, err, "cannot get absolute path of rootDir") + + tmpDir := libbeattesting.CreateTempDir(t, rootDir) + inputFilePath, err := filepath.Abs(filepath.Join(tmpDir, "log.log")) + + // Generate a string we can use to search in the logs, + // without it tests on Windows will fail + inputFilePathStr := strings.ReplaceAll(inputFilePath, `\`, `\\`) + + libbeattesting.WriteLogFile(t, inputFilePath, 50, false) + + esApiKey := createESApiKey(t, info.ESClient) + esHost, err := integration.GetESHost() + require.NoError(t, err, "failed to get ES host") + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(2*time.Minute)) + defer cancel() + + agentLogFile := NewLogFile(t, tmpDir) + agentLogFile.KeepLogFileOnSuccess = true + + cfg := map[string]any{ + "HomeDir": tmpDir, + "LogFilepath": inputFilePath, + "ESApiKey": esApiKey.Encoded, + "ESEndpoint": esHost, + "Namespace": info.Namespace, + "AsFilestream": false, + } + + fixture, err := define.NewFixtureFromLocalBuild( + t, + define.Version()) + require.NoError(t, err, "cannot create Elastic Agent fixture") + + // Start Elastic Agent/Filebeat receiver running the Log input + otelConfigPath := filepath.Join(tmpDir, "otel.yml") + cmd := StartElasticAgentOtel(t, ctx, otelConfigTemplate, otelConfigPath, cfg, fixture, agentLogFile.File) + agentLogFile.WaitLogsContains( + t, + "Log input (deprecated) running as Log input (deprecated)", + 20*time.Second, + "Log input did not start as Log input", + ) + + // Wait for all events to be ingested and stop Elastic Agent + waitEventsInES(50) + StopElasticAgentOtel(t, cmd, agentLogFile) + + // Enable the feature flag and start Elastic Agent + cfg["AsFilestream"] = true + cmd = StartElasticAgentOtel(t, ctx, otelConfigTemplate, otelConfigPath, cfg, fixture, agentLogFile.File) + + // Ensure the Filesteam input starts + agentLogFile.WaitLogsContains( + t, + "Log input (deprecated) running as Filestream input", + 20*time.Second, + "Log input did not start as Filestream input", + ) + + agentLogFile.WaitLogsContains( + t, + "Input 'filestream' starting", + 20*time.Second, + "Filestream did not start", + ) + + // Add 50 events to the file, it now contains 100 events + libbeattesting.WriteLogFile(t, inputFilePath, 50, true) + + agentLogFile.WaitLogsContains( + t, + "File "+inputFilePathStr+" has been updated", + 20*time.Second, + "Filestream did not detect change in the file") + + // Wait for Filestream to finish reading the file + agentLogFile.WaitLogsContains( + t, + "End of file reached: "+inputFilePathStr+"; Backoff now", + 20*time.Second, + "Filestream did not reach EOF") + + // Ensure all 100 events have been ingested and stop Elastic Agent + waitEventsInES(100) + StopElasticAgentOtel(t, cmd, agentLogFile) + + // Start Elastic Agent again to ensure it is correctly tracking the state + cmd = StartElasticAgentOtel(t, ctx, otelConfigTemplate, otelConfigPath, cfg, fixture, agentLogFile.File) + agentLogFile.WaitLogsContains( + t, + "Log input (deprecated) running as Filestream input", + 20*time.Second, + "Log input did not start as Filestream input", + ) + + agentLogFile.WaitLogsContains( + t, + "Input 'filestream' starting", + 20*time.Second, + "Filestream did not start", + ) + + agentLogFile.WaitLogsContains( + t, + "End of file reached: "+inputFilePathStr+"; Backoff now.", + 20*time.Second, + "Filestream did not reach EOF") + + // Stop Elastic Agent + StopElasticAgentOtel(t, cmd, agentLogFile) + + // Ensure there was no data duplication + waitEventsInES(100) +} + +func StartElasticAgentOtel( + t *testing.T, + ctx context.Context, + otelConfigTemplate string, + otelConfigPath string, + cfg map[string]any, + fixture *atesting.Fixture, + f *os.File) *exec.Cmd { + + otelConfigBuffer := bytes.Buffer{} + require.NoError(t, + template.Must( + template.New("otelConfig"). + Parse(otelConfigTemplate)). + Execute( + &otelConfigBuffer, + cfg)) + + require.NoError( + t, + os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600), + "cannot write configuration file") + + cmd, err := fixture.PrepareAgentCommand( + ctx, + []string{"otel", "--config", otelConfigPath}, + ) + require.NoError(t, err, "cannot prepare Elastic Agent command") + + cmd.Stderr = f + cmd.Stdout = f + + require.NoError(t, cmd.Start(), "cannot start Elastic Agent in OTel mode") + + return cmd +} + +func StopElasticAgentOtel(t *testing.T, cmd *exec.Cmd, f *LogFile) { + require.NoError( + t, + process.Terminate(cmd.Process), + "cannot send terminate signal to Elastic Agent") + + require.NoError(t, cmd.Wait(), "Elastic Agent exited with an error") + + f.WaitLogsContains( + t, + "Shutdown complete.", + time.Second, + "Filebeat Receiver didn't shutdown") +} diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 6507b270f1e..d091d04a5fa 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -543,9 +543,7 @@ func TestOtelLogsIngestion(t *testing.T) { esClient := info.ESClient require.NotNil(t, esClient) - esApiKey, err := createESApiKey(esClient) - require.NoError(t, err, "failed to get api key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, esClient) logsIngestionConfig := logsIngestionConfigTemplate logsIngestionConfig = strings.ReplaceAll(logsIngestionConfig, "{{.ESApiKey}}", esApiKey.Encoded) @@ -666,9 +664,7 @@ func TestOtelAPMIngestion(t *testing.T) { require.True(t, len(esHost) > 0) esClient := info.ESClient - esApiKey, err := createESApiKey(esClient) - require.NoError(t, err, "failed to get api key") - require.True(t, len(esApiKey.APIKey) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, esClient) apmArgs := []string{ "run", @@ -777,8 +773,21 @@ func TestOtelAPMIngestion(t *testing.T) { apmFixtureWg.Wait() } -func createESApiKey(esClient *elasticsearch.Client) (estools.APIKeyResponse, error) { - return estools.CreateAPIKey(context.Background(), esClient, estools.APIKeyRequest{Name: "test-api-key", Expiration: "1d"}) +func createESApiKey(t *testing.T, esClient *elasticsearch.Client) estools.APIKeyResponse { + esApiKey, err := estools.CreateAPIKey( + t.Context(), + esClient, + estools.APIKeyRequest{Name: "test-api-key", Expiration: "1d"}, + ) + + require.NoError(t, err, "error creating API key") + require.Truef( + t, + len(esApiKey.APIKey) > 1 && len(esApiKey.Encoded) > 1, + "api key is invalid %q", + esApiKey) + + return esApiKey } // getDecodedApiKey returns a decoded API key appropriate for use in beats configurations. @@ -818,6 +827,262 @@ func mapAtLeastOneTrue(mm map[string]bool) bool { return false } +func TestFileBeatReceiver(t *testing.T) { + define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + }) + + type otelConfigOptions struct { + Message string + Output string + HomeDir string + } + testMessage := "supercalifragilisticexpialidocious" + tmpDir := t.TempDir() + exporterOutputPath := filepath.Join(tmpDir, "output.json") + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(exporterOutputPath) + if err != nil { + t.Logf("No exporter output file") + return + } + t.Logf("Contents of exporter output file:\n%s\n", string(contents)) + } + }) + otelConfigPath := filepath.Join(tmpDir, "otel.yml") + otelConfigTemplate := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: benchmark + enabled: true + count: 1 + message: {{.Message}} + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}} +exporters: + file/no_rotation: + path: {{.Output}} +service: + pipelines: + logs: + receivers: [filebeatreceiver] + exporters: [file/no_rotation] +` + + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + Message: testMessage, + Output: exporterOutputPath, + HomeDir: tmpDir, + })) + require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600)) + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(otelConfigPath) + if err != nil { + t.Logf("no otel config file") + return + } + t.Logf("Contents of otel config file:\n%s\n", string(contents)) + } + }) + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath})) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx, fakeComponent) + require.NoError(t, err) + + var fixtureWg sync.WaitGroup + fixtureWg.Add(1) + go func() { + defer fixtureWg.Done() + err = fixture.RunOtelWithClient(ctx) + }() + + require.Eventually(t, + func() bool { + content, err := os.ReadFile(exporterOutputPath) + if err != nil || len(content) == 0 { + return false + } + return bytes.Contains(content, []byte(testMessage)) + }, + 3*time.Minute, 1*time.Second, + fmt.Sprintf("there should be exported logs by now")) + + cancel() + fixtureWg.Wait() + require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error()) +} + +func TestOtelFBReceiverE2E(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 50 + // Create the data file to ingest + inputFile, err := os.CreateTemp(tmpDir, "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d\n", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data temp file") + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + t.Logf("contents of import file:\n%s\n", string(contents)) + } + }) + + // Create the otel configuration file + type otelConfigOptions struct { + InputPath string + HomeDir string + ESEndpoint string + ESApiKey string + Index string + MinItems int + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey := createESApiKey(t, info.ESClient) + + index := "logs-integration-default" + otelConfigTemplate := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-end-to-end + enabled: true + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}} + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + sending_queue: + wait_for_result: true # Avoid losing data on shutdown + block_on_overflow: true + batch: + flush_timeout: 1s + min_size: {{.MinItems}} + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log +` + otelConfigPath := filepath.Join(tmpDir, "otel.yml") + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + InputPath: inputFilePath, + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + MinItems: numEvents, + })) + require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600)) + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(otelConfigPath) + if err != nil { + t.Logf("No otel configuration file at %s", otelConfigPath) + return + } + t.Logf("Contents of otel config file:\n%s\n", string(contents)) + } + }) + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath})) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx, fakeComponent) + require.NoError(t, err) + + var fixtureWg sync.WaitGroup + fixtureWg.Add(1) + go func() { + defer fixtureWg.Done() + err = fixture.RunOtelWithClient(ctx) + }() + + // Make sure find the logs + actualHits := &struct{ Hits int }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + return actualHits.Hits == numEvents + }, + 2*time.Minute, 1*time.Second, + "Expected %d logs, got %v", numEvents, actualHits) + + cancel() + fixtureWg.Wait() + require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) +} + func TestOtelFilestreamInput(t *testing.T) { info := define.Require(t, define.Requirements{ Group: integration.Default, @@ -863,9 +1128,7 @@ func TestOtelFilestreamInput(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) decodedApiKey, err := getDecodedApiKey(esApiKey) require.NoError(t, err) configTemplate := ` @@ -1008,9 +1271,7 @@ func TestOTelHTTPMetricsInput(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) decodedApiKey, err := getDecodedApiKey(esApiKey) require.NoError(t, err) configTemplate := ` @@ -1114,6 +1375,134 @@ agent.internal.runtime.metricbeat: cmd.Wait() } +func TestOtelMBReceiverE2E(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + // {Type: define.Windows}, we don't support otel on Windows yet + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + + // Create the otel configuration file + type otelConfigOptions struct { + HomeDir string + ESEndpoint string + ESApiKey string + Index string + MinItems int + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey := createESApiKey(t, info.ESClient) + + index := "logs-integration-default" + otelConfigTemplate := `receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}} + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + sending_queue: + wait_for_result: true # Avoid losing data on shutdown + block_on_overflow: true + batch: + flush_timeout: 1s + min_size: {{.MinItems}} + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log +` + otelConfigPath := filepath.Join(tmpDir, "otel.yml") + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + MinItems: 1, + })) + require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600)) + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(otelConfigPath) + if err != nil { + t.Logf("No otel configuration file at %s", otelConfigPath) + return + } + t.Logf("Contents of otel config file:\n%s\n", string(contents)) + } + }) + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath})) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx, fakeComponent) + require.NoError(t, err) + + var fixtureWg sync.WaitGroup + fixtureWg.Add(1) + go func() { + defer fixtureWg.Done() + err = fixture.RunOtelWithClient(ctx) + }() + + // Make sure find the logs + actualHits := &struct{ Hits int }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{ + "metricset.name": "cpu", + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + return actualHits.Hits >= 1 + }, + 2*time.Minute, 1*time.Second, + "Expected at least %d logs, got %v", 1, actualHits) + + cancel() + fixtureWg.Wait() + require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) +} + func TestHybridAgentE2E(t *testing.T) { // This test is a hybrid agent test that ingests a single log with // filebeat and fbreceiver. It then compares the final documents in @@ -1167,9 +1556,7 @@ func TestHybridAgentE2E(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) configTemplate := `agent.logging.level: info agent.logging.to_stderr: true @@ -1432,9 +1819,8 @@ func TestFBOtelRestartE2E(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) + // Use a unique index to avoid conflicts with other parallel runners index := strings.ToLower("logs-generic-default-" + randStr(8)) otelConfigTemplate := `receivers: @@ -1640,9 +2026,7 @@ func TestOtelBeatsAuthExtension(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) index := "logs-integration-" + info.Namespace fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) @@ -1796,9 +2180,7 @@ func TestOtelBeatsAuthExtensionInvalidCertificates(t *testing.T) { } esEndpoint, err := integration.GetESHost() require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + esApiKey := createESApiKey(t, info.ESClient) index := "logs-integration-" + info.Namespace fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())