Skip to content

Commit 2756c09

Browse files
authored
Prepare OTel Elasticsearch exporter with the credentials of the output (#5469)
When an OTel exporter is found in the configuration, and it is an Elasticsearch exporter, it looks for a prepared output with the same name, and copies its credentials.
1 parent b74487c commit 2756c09

File tree

4 files changed

+135
-31
lines changed

4 files changed

+135
-31
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add credentials to OTel Elasticsearch exporters
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: When a policy includes OTel configuration with Elasticsearch exporters, it configures their credentials using the credentials in the Elasticsearch output.
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: fleet-server
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/fleet-server/pull/5469
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

internal/pkg/api/handleCheckin.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"compress/flate"
1010
"compress/gzip"
1111
"context"
12+
"encoding/base64"
1213
"encoding/json"
1314
"errors"
1415
"fmt"
@@ -17,6 +18,7 @@ import (
1718
"net/http"
1819
"reflect"
1920
"slices"
21+
"strings"
2022
"sync"
2123
"time"
2224

@@ -876,6 +878,11 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
876878
policyOutput.Name, err)
877879
}
878880
}
881+
// Prepare OTel exporters from the information in outputs.
882+
if err := prepareOTelExporters(data.Outputs, data.Exporters); err != nil {
883+
return nil, fmt.Errorf("failed to prepare OTel exporters: %w", err)
884+
}
885+
879886
// Add replace inputs with agent prepared version.
880887
data.Inputs = pp.Inputs
881888

@@ -911,6 +918,49 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
911918
return &resp, nil
912919
}
913920

921+
// prepareOTelExporters prepares OTel exporters by copying credentials and potentially other
922+
// settings from already prepared outputs.
923+
func prepareOTelExporters(outputs map[string]map[string]any, exporters map[string]any) error {
924+
for id, c := range exporters {
925+
var config map[string]any
926+
if c == nil {
927+
config = make(map[string]any)
928+
} else if cmap, ok := c.(map[string]any); ok {
929+
config = cmap
930+
} else {
931+
return fmt.Errorf("unexpected config type for %q, expected map, found %T", id, c)
932+
}
933+
934+
exporterType, name, found := strings.Cut(id, "/")
935+
if !found {
936+
return fmt.Errorf("unexpected exporter id format %q", id)
937+
}
938+
939+
output, found := outputs[name]
940+
if !found {
941+
return fmt.Errorf("output %q not found for exporter %q", name, id)
942+
}
943+
944+
switch exporterType {
945+
case policy.OTelExporterTypeElasticsearch:
946+
ot, ok := output["type"].(string)
947+
if !ok || (ot != policy.OutputTypeElasticsearch && ot != policy.OutputTypeRemoteElasticsearch) {
948+
return fmt.Errorf("unexpected output type %q found for exporter %q", name, id)
949+
}
950+
apiKey, ok := output["api_key"].(string)
951+
if !ok || apiKey == "" {
952+
return fmt.Errorf("api key not found in output %q for exporter %q", name, id)
953+
}
954+
config["api_key"] = base64.StdEncoding.EncodeToString([]byte(apiKey))
955+
default:
956+
return fmt.Errorf("OTel exporter %q not supported", exporterType)
957+
}
958+
959+
exporters[id] = config
960+
}
961+
return nil
962+
}
963+
914964
func getAgentAndVerifyAPIKeyID(ctx context.Context, bulker bulk.Bulk, agentID string, apiKeyID string) (*model.Agent, error) {
915965
span, ctx := apm.StartSpan(ctx, "getAgentAndVerifyAPIKeyID", "read")
916966
defer span.End()

internal/pkg/policy/policy_output.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
OutputTypeRemoteElasticsearch = "remote_elasticsearch"
3030
OutputTypeLogstash = "logstash"
3131
OutputTypeKafka = "kafka"
32+
33+
OTelExporterTypeElasticsearch = "elasticsearch"
3234
)
3335

3436
var (

internal/pkg/server/fleet_integration_test.go

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package server
1010
import (
1111
"bytes"
1212
"context"
13+
"encoding/base64"
1314
"encoding/json"
1415
"errors"
1516
"fmt"
@@ -26,6 +27,7 @@ import (
2627
"github.com/google/go-cmp/cmp"
2728
"github.com/hashicorp/go-cleanhttp"
2829
"github.com/rs/zerolog"
30+
"github.com/stretchr/testify/assert"
2931
"github.com/stretchr/testify/mock"
3032
"github.com/stretchr/testify/require"
3133
"golang.org/x/sync/errgroup"
@@ -1675,8 +1677,7 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) {
16751677
}
16761678

16771679
func TestCheckinOTelColPolicy(t *testing.T) {
1678-
ctx, cancel := context.WithCancel(t.Context())
1679-
defer cancel()
1680+
ctx := t.Context()
16801681

16811682
idSuffix := uuid.Must(uuid.NewV4()).String()
16821683
componentID := func(id string) string {
@@ -1697,21 +1698,21 @@ func TestCheckinOTelColPolicy(t *testing.T) {
16971698
componentID("someprocessor"): map[string]any{},
16981699
},
16991700
Connectors: map[string]any{
1700-
componentID("forward"): map[string]any{},
1701+
"forward": map[string]any{},
17011702
},
17021703
Exporters: map[string]any{
1703-
componentID("someexporter"): map[string]any{},
1704+
"elasticsearch/default": map[string]any{},
17041705
},
17051706
Service: &model.Service{
17061707
Pipelines: map[string]*model.PipelinesItem{
17071708
componentID("metrics"): &model.PipelinesItem{
17081709
Receivers: []string{componentID("somereceiver")},
17091710
Processors: []string{componentID("someprocessor")},
1710-
Exporters: []string{componentID("forward")},
1711+
Exporters: []string{"forward"},
17111712
},
17121713
"metrics": &model.PipelinesItem{
1713-
Receivers: []string{componentID("forward")},
1714-
Exporters: []string{componentID("someexporter")},
1714+
Receivers: []string{"forward"},
1715+
Exporters: []string{"elasticsearch/default"},
17151716
},
17161717
},
17171718
},
@@ -1737,18 +1738,16 @@ func TestCheckinOTelColPolicy(t *testing.T) {
17371738
t.Log("Agent enrollment successful")
17381739
p, _ := io.ReadAll(res.Body)
17391740
res.Body.Close()
1740-
var obj map[string]interface{}
1741-
err = json.Unmarshal(p, &obj)
1741+
var enrollResponse struct {
1742+
Item struct {
1743+
ID string `json:"id"`
1744+
AccessApiKey string `json:"access_api_key"`
1745+
} `json:"item"`
1746+
}
1747+
err = json.Unmarshal(p, &enrollResponse)
17421748
require.NoError(t, err)
1743-
1744-
item := obj["item"]
1745-
mm, ok := item.(map[string]interface{})
1746-
require.True(t, ok, "expected attribute item to be an object")
1747-
agentID, ok := mm["id"].(string)
1748-
require.True(t, ok, "expected attribute id to be a string")
1749-
1750-
apiKey, ok := mm["access_api_key"].(string)
1751-
require.True(t, ok, "expected attribute apiKey to be a string")
1749+
agentID := enrollResponse.Item.ID
1750+
apiKey := enrollResponse.Item.AccessApiKey
17521751

17531752
// checkin
17541753
t.Logf("Fake a checkin for agent %s", agentID)
@@ -1766,19 +1765,40 @@ func TestCheckinOTelColPolicy(t *testing.T) {
17661765
p, err = io.ReadAll(res.Body)
17671766
require.NoError(t, err)
17681767

1769-
err = json.Unmarshal(p, &obj)
1768+
var checkinResponse struct {
1769+
Actions []struct {
1770+
AgentID string `json:"agent_id"`
1771+
ID string `json:"id"`
1772+
Data struct {
1773+
Policy struct {
1774+
Exporters map[string]struct {
1775+
ApiKey string `json:"api_key"`
1776+
} `json:"exporters"`
1777+
Outputs map[string]struct {
1778+
ApiKey string `json:"api_key"`
1779+
Type string `json:"type"`
1780+
} `json:"outputs"`
1781+
} `json:"policy"`
1782+
} `json:"data"`
1783+
} `json:"actions"`
1784+
}
1785+
err = json.Unmarshal(p, &checkinResponse)
17701786
require.NoError(t, err)
17711787

1772-
actionsRaw, ok := obj["actions"]
1773-
require.True(t, ok, "expected actions is missing")
1774-
actions, ok := actionsRaw.([]interface{})
1775-
require.True(t, ok, "expected actions to be an array")
1776-
require.Greater(t, len(actions), 0, "expected at least 1 action")
1777-
action, ok := actions[0].(map[string]interface{})
1778-
require.True(t, ok, "expected action to be an object")
1779-
_, ok = action["id"].(string)
1780-
require.True(t, ok, "expected action id to be string")
1781-
aAgentID, ok := action["agent_id"].(string)
1782-
require.True(t, ok, "expected action agent_id to be string")
1783-
require.Equal(t, agentID, aAgentID)
1788+
require.Len(t, checkinResponse.Actions, 1, "expected 1 action")
1789+
1790+
action := checkinResponse.Actions[0]
1791+
assert.NotEmpty(t, action.ID)
1792+
assert.Equal(t, agentID, action.AgentID)
1793+
1794+
output, found := action.Data.Policy.Outputs["default"]
1795+
require.True(t, found, "default output not found")
1796+
require.Equal(t, "elasticsearch", output.Type)
1797+
require.NotEmpty(t, output.ApiKey)
1798+
1799+
exporter, found := action.Data.Policy.Exporters["elasticsearch/default"]
1800+
require.True(t, found, "default exporter not found")
1801+
encodedApiKey := base64.StdEncoding.EncodeToString([]byte(output.ApiKey))
1802+
1803+
assert.Equal(t, encodedApiKey, exporter.ApiKey)
17841804
}

0 commit comments

Comments
 (0)