diff --git a/controllers/consoleplugin/consoleplugin_objects.go b/controllers/consoleplugin/consoleplugin_objects.go index 63b4596d5..528653e59 100644 --- a/controllers/consoleplugin/consoleplugin_objects.go +++ b/controllers/consoleplugin/consoleplugin_objects.go @@ -22,6 +22,7 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/controllers/ebpf" "github.com/netobserv/network-observability-operator/pkg/helper" + "github.com/netobserv/network-observability-operator/pkg/loki" "github.com/netobserv/network-observability-operator/pkg/volumes" ) @@ -310,12 +311,7 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) { if lconf.URL != statusURL { lconf.StatusURL = statusURL } - // check for connection traking to list indexes - indexFields := constants.LokiIndexFields - if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { - indexFields = append(indexFields, constants.LokiConnectionIndexFields...) - } - lconf.Labels = indexFields + lconf.Labels = loki.GetLokiLabels(b.desired) lconf.TenantID = b.loki.TenantID lconf.ForwardUserToken = b.loki.UseForwardToken() if b.loki.TLS.Enable { @@ -347,8 +343,16 @@ func (b *builder) setLokiConfig(lconf *config.LokiConfig) { } } -func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) { - var dedupJustMark, dedupMerge bool +func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) error { + var err error + dedupJustMark, err := strconv.ParseBool(ebpf.DedupeJustMarkDefault) + if err != nil { + return err + } + dedupMerge, err := strconv.ParseBool(ebpf.DedupeMergeDefault) + if err != nil { + return err + } if helper.UseEBPF(b.desired) { if helper.IsPktDropEnabled(&b.desired.Agent.EBPF) { fconf.Features = append(fconf.Features, "pktDrop") @@ -364,15 +368,17 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) { if b.desired.Agent.EBPF.Advanced != nil { if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok { - dedupJustMark, _ = strconv.ParseBool(v) - } else { - dedupJustMark, _ = strconv.ParseBool(ebpf.DedupeJustMarkDefault) + dedupJustMark, err = strconv.ParseBool(v) + if err != nil { + return err + } } if v, ok := b.desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeMerge]; ok { - dedupMerge, _ = strconv.ParseBool(v) - } else { - dedupMerge, _ = strconv.ParseBool(ebpf.DedupeMergeDefault) + dedupMerge, err = strconv.ParseBool(v) + if err != nil { + return err + } } } } @@ -385,6 +391,7 @@ func (b *builder) setFrontendConfig(fconf *config.FrontendConfig) { Mark: dedupJustMark, Merge: dedupMerge, } + return nil } //go:embed config/static-frontend-config.yaml @@ -407,7 +414,10 @@ func (b *builder) configMap() (*corev1.ConfigMap, string, error) { if err != nil { return nil, "", err } - b.setFrontendConfig(&config.Frontend) + err = b.setFrontendConfig(&config.Frontend) + if err != nil { + return nil, "", err + } var configStr string bs, err := yaml.Marshal(config) diff --git a/controllers/consoleplugin/consoleplugin_test.go b/controllers/consoleplugin/consoleplugin_test.go index 8d18a5a70..b70a373f5 100644 --- a/controllers/consoleplugin/consoleplugin_test.go +++ b/controllers/consoleplugin/consoleplugin_test.go @@ -11,8 +11,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" + "sigs.k8s.io/yaml" flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" + config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config" "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/helper" ) @@ -285,6 +287,100 @@ func TestConfigMapUpdateWithLokistackMode(t *testing.T) { assert.NotEqual(old.Data, nEw.Data) } +func TestConfigMapContent(t *testing.T) { + assert := assert.New(t) + + agentSpec := flowslatest.FlowCollectorAgent{ + Type: "eBPF", + EBPF: flowslatest.FlowCollectorEBPF{ + Sampling: ptr.To(int32(1)), + }, + } + lokiSpec := flowslatest.FlowCollectorLoki{ + Mode: flowslatest.LokiModeLokiStack, + LokiStack: flowslatest.LokiStackRef{Name: "lokistack", Namespace: "ls-namespace"}, + } + loki := helper.NewLokiConfig(&lokiSpec, "any") + spec := flowslatest.FlowCollectorSpec{ + Agent: agentSpec, + ConsolePlugin: getPluginConfig(), + Loki: lokiSpec, + } + builder := newBuilder(testNamespace, testImage, &spec, &loki) + cm, _, err := builder.configMap() + assert.NotNil(cm) + assert.Nil(err) + + // parse output config and check expected values + var config config.PluginConfig + err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &config) + assert.Nil(err) + + // loki config + assert.Equal(config.Loki.URL, "https://lokistack-gateway-http.ls-namespace.svc:8080/api/logs/v1/network/") + assert.Equal(config.Loki.StatusURL, "https://lokistack-query-frontend-http.ls-namespace.svc:3100/") + + // frontend params + assert.Equal(config.Frontend.RecordTypes, []string{"flowLog"}) + assert.Empty(config.Frontend.Features) + assert.NotEmpty(config.Frontend.Columns) + assert.NotEmpty(config.Frontend.Filters) + assert.Equal(config.Frontend.Sampling, 1) + assert.Equal(config.Frontend.Deduper.Mark, true) + assert.Equal(config.Frontend.Deduper.Merge, false) +} + +func TestConfigMapError(t *testing.T) { + assert := assert.New(t) + + agentSpec := flowslatest.FlowCollectorAgent{ + Type: "eBPF", + EBPF: flowslatest.FlowCollectorEBPF{ + Sampling: ptr.To(int32(1)), + Advanced: &flowslatest.AdvancedAgentConfig{ + Env: map[string]string{ + "DEDUPER_JUST_MARK": "invalid", + }, + }, + }, + } + lokiSpec := flowslatest.FlowCollectorLoki{} + loki := helper.NewLokiConfig(&lokiSpec, "any") + + // spec with invalid flag + spec := flowslatest.FlowCollectorSpec{ + Agent: agentSpec, + ConsolePlugin: getPluginConfig(), + Loki: lokiSpec, + } + builder := newBuilder(testNamespace, testImage, &spec, &loki) + cm, _, err := builder.configMap() + assert.Nil(cm) + assert.NotNil(err) + + // update to valid flags + agentSpec.EBPF.Advanced.Env = map[string]string{ + "DEDUPER_JUST_MARK": "false", + "DEDUPER_MERGE": "true", + } + spec = flowslatest.FlowCollectorSpec{ + Agent: agentSpec, + ConsolePlugin: getPluginConfig(), + Loki: lokiSpec, + } + builder = newBuilder(testNamespace, testImage, &spec, &loki) + cm, _, err = builder.configMap() + assert.NotNil(cm) + assert.Nil(err) + + // parse output config and check expected values + var config config.PluginConfig + err = yaml.Unmarshal([]byte(cm.Data["config.yaml"]), &config) + assert.Nil(err) + assert.Equal(config.Frontend.Deduper.Mark, false) + assert.Equal(config.Frontend.Deduper.Merge, true) +} + func TestServiceUpdateCheck(t *testing.T) { assert := assert.New(t) old := getServiceSpecs() diff --git a/controllers/constants/constants.go b/controllers/constants/constants.go index 438aa6ff7..021942d09 100644 --- a/controllers/constants/constants.go +++ b/controllers/constants/constants.go @@ -36,6 +36,8 @@ const ( HeartbeatType = "heartbeat" EndConnectionType = "endConnection" + ClusterNameLabelName = "K8S_ClusterName" + MonitoringNamespace = "openshift-monitoring" MonitoringServiceAccount = "prometheus-k8s" @@ -45,8 +47,9 @@ const ( LokiCRReader = "netobserv-reader" ) -var LokiIndexFields = []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "FlowDirection", "Duplicate"} +var LokiIndexFields = []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type"} var LokiConnectionIndexFields = []string{"_RecordType"} +var LokiDeduperMarkIndexFields = []string{"FlowDirection", "Duplicate"} var FlowCollectorName = types.NamespacedName{Name: "cluster"} var EnvNoHTTP2 = corev1.EnvVar{ Name: "GODEBUG", diff --git a/controllers/flowcollector_controller_certificates_test.go b/controllers/flowcollector_controller_certificates_test.go index d1b7ded16..570c760a8 100644 --- a/controllers/flowcollector_controller_certificates_test.go +++ b/controllers/flowcollector_controller_certificates_test.go @@ -146,6 +146,13 @@ func flowCollectorCertificatesSpecs() { DeploymentModel: flowslatest.DeploymentModelKafka, Agent: flowslatest.FlowCollectorAgent{ Type: "eBPF", + EBPF: flowslatest.FlowCollectorEBPF{ + Advanced: &flowslatest.AdvancedAgentConfig{ + Env: map[string]string{ + "DEDUPER_JUST_MARK": "true", + }, + }, + }, }, Loki: flowslatest.FlowCollectorLoki{ Enable: ptr.To(true), diff --git a/controllers/flp/flp_pipeline_builder.go b/controllers/flp/flp_pipeline_builder.go index b0f25eabf..6e6262468 100644 --- a/controllers/flp/flp_pipeline_builder.go +++ b/controllers/flp/flp_pipeline_builder.go @@ -15,14 +15,11 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" "github.com/netobserv/network-observability-operator/pkg/filters" "github.com/netobserv/network-observability-operator/pkg/helper" + "github.com/netobserv/network-observability-operator/pkg/loki" "github.com/netobserv/network-observability-operator/pkg/metrics" "github.com/netobserv/network-observability-operator/pkg/volumes" ) -const ( - clusterNameLabelName = "K8S_ClusterName" -) - type PipelineBuilder struct { *config.PipelineBuilderStage desired *flowslatest.FlowCollectorSpec @@ -52,15 +49,8 @@ func newPipelineBuilder( func (b *PipelineBuilder) AddProcessorStages() error { lastStage := *b.PipelineBuilderStage - indexFields := constants.LokiIndexFields - lastStage = b.addTransformFilter(lastStage) - - indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage) - - if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment { - indexFields = append(indexFields, clusterNameLabelName) - } + lastStage = b.addConnectionTracking(lastStage) // enrich stage (transform) configuration enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{ @@ -88,7 +78,7 @@ func (b *PipelineBuilder) AddProcessorStages() error { debugConfig := helper.GetAdvancedLokiConfig(b.desired.Loki.Advanced) if helper.UseLoki(b.desired) { lokiWrite := api.WriteLoki{ - Labels: indexFields, + Labels: loki.GetLokiLabels(b.desired), BatchSize: int(b.desired.Loki.WriteBatchSize), BatchWait: helper.UnstructuredDuration(b.desired.Loki.WriteBatchWait), MaxBackoff: helper.UnstructuredDuration(debugConfig.WriteMaxBackoff), @@ -200,7 +190,7 @@ func flowMetricToFLP(flowMetric *metricslatest.FlowMetricSpec) (*api.PromMetrics return m, nil } -func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) { +func (b *PipelineBuilder) addConnectionTracking(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage { outputFields := []api.OutputField{ { Name: "Bytes", @@ -307,7 +297,6 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage // Connection tracking stage (only if LogTypes is not FLOWS) if b.desired.Processor.LogTypes != nil && *b.desired.Processor.LogTypes != flowslatest.LogTypeFlows { - indexFields = append(indexFields, constants.LokiConnectionIndexFields...) outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor) debugConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced) lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{ @@ -342,7 +331,7 @@ func (b *PipelineBuilder) addConnectionTracking(indexFields []string, lastStage }, }) } - return indexFields, lastStage + return lastStage } func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage { @@ -359,7 +348,7 @@ func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderSta if clusterName != "" { transformFilterRules = []api.TransformFilterRule{ { - Input: clusterNameLabelName, + Input: constants.ClusterNameLabelName, Type: "add_field_if_doesnt_exist", Value: clusterName, }, diff --git a/controllers/flp/flp_test.go b/controllers/flp/flp_test.go index e45f62b2d..08b037bfe 100644 --- a/controllers/flp/flp_test.go +++ b/controllers/flp/flp_test.go @@ -655,8 +655,6 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiManual(t *testing.T) { "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", - "FlowDirection", - "Duplicate", "_RecordType", }, lokiCfg.Labels) assert.Equal(`{app="netobserv-flowcollector"}`, fmt.Sprintf("%v", lokiCfg.StaticLabels)) @@ -670,7 +668,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) { ns := "namespace" cfg := getConfig() useLokiStack(&cfg) - cfg.Agent.Type = flowslatest.AgentIPFIX + cfg.Agent.Type = flowslatest.AgentEBPF b := monoBuilder(ns, &cfg) cm, digest, err := b.configMap() assert.NoError(err) @@ -689,7 +687,6 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) { params := decoded.Parameters assert.Len(params, 6) - assert.Equal(*cfg.Processor.Advanced.Port, int32(params[0].Ingest.Collector.Port)) lokiCfg := params[3].Write.Loki assert.Equal("https://lokistack-gateway-http.ls-namespace.svc:8080/api/logs/v1/network/", lokiCfg.URL) @@ -705,7 +702,7 @@ func TestConfigMapShouldDeserializeAsJSONWithLokiStack(t *testing.T) { assert.Equal(cfg.Loki.Advanced.WriteMinBackoff.Duration.String(), lokiCfg.MinBackoff) assert.Equal(cfg.Loki.Advanced.WriteMaxBackoff.Duration.String(), lokiCfg.MaxBackoff) assert.EqualValues(*cfg.Loki.Advanced.WriteMaxRetries, lokiCfg.MaxRetries) - assert.EqualValues([]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "FlowDirection", "Duplicate", "_RecordType"}, lokiCfg.Labels) + assert.EqualValues([]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "SrcK8S_Type", "DstK8S_Namespace", "DstK8S_OwnerName", "DstK8S_Type", "_RecordType", "FlowDirection", "Duplicate"}, lokiCfg.Labels) assert.Equal(`{app="netobserv-flowcollector"}`, fmt.Sprintf("%v", lokiCfg.StaticLabels)) assert.Equal(cfg.Processor.Metrics.Server.Port, int32(decoded.MetricsSettings.Port)) diff --git a/pkg/loki/labels.go b/pkg/loki/labels.go new file mode 100644 index 000000000..ce3fb4356 --- /dev/null +++ b/pkg/loki/labels.go @@ -0,0 +1,36 @@ +package loki + +import ( + "strconv" + + flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2" + "github.com/netobserv/network-observability-operator/controllers/constants" + "github.com/netobserv/network-observability-operator/controllers/ebpf" + "github.com/netobserv/network-observability-operator/pkg/helper" +) + +func GetLokiLabels(desired *flowslatest.FlowCollectorSpec) []string { + indexFields := constants.LokiIndexFields + + if desired.Processor.LogTypes != nil && *desired.Processor.LogTypes != flowslatest.LogTypeFlows { + indexFields = append(indexFields, constants.LokiConnectionIndexFields...) + } + + if desired.Processor.MultiClusterDeployment != nil && *desired.Processor.MultiClusterDeployment { + indexFields = append(indexFields, constants.ClusterNameLabelName) + } + + if helper.UseEBPF(desired) { + dedupJustMark, _ := strconv.ParseBool(ebpf.DedupeJustMarkDefault) + if desired.Agent.EBPF.Advanced != nil { + if v, ok := desired.Agent.EBPF.Advanced.Env[ebpf.EnvDedupeJustMark]; ok { + dedupJustMark, _ = strconv.ParseBool(v) + } + } + if dedupJustMark { + indexFields = append(indexFields, constants.LokiDeduperMarkIndexFields...) + } + } + + return indexFields +}