Skip to content

Commit

Permalink
Update FLP, use provided enums in API (netobserv#594)
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak authored Mar 22, 2024
1 parent 485e361 commit 229d8ac
Show file tree
Hide file tree
Showing 38 changed files with 558 additions and 472 deletions.
13 changes: 7 additions & 6 deletions controllers/consoleplugin/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
)

Expand Down Expand Up @@ -77,12 +78,12 @@ type Deduper struct {
}

type FrontendConfig struct {
RecordTypes []string `yaml:"recordTypes" json:"recordTypes"`
Columns []ColumnConfig `yaml:"columns" json:"columns"`
Sampling int `yaml:"sampling" json:"sampling"`
Features []string `yaml:"features" json:"features"`
Deduper Deduper `yaml:"deduper" json:"deduper"`
Fields []FieldConfig `yaml:"fields" json:"fields"`
RecordTypes []api.ConnTrackOutputRecordTypeEnum `yaml:"recordTypes" json:"recordTypes"`
Columns []ColumnConfig `yaml:"columns" json:"columns"`
Sampling int `yaml:"sampling" json:"sampling"`
Features []string `yaml:"features" json:"features"`
Deduper Deduper `yaml:"deduper" json:"deduper"`
Fields []FieldConfig `yaml:"fields" json:"fields"`

PortNaming flowslatest.ConsolePluginPortConfig `yaml:"portNaming,omitempty" json:"portNaming,omitempty"`
Filters []FilterConfig `yaml:"filters,omitempty" json:"filters,omitempty"`
Expand Down
3 changes: 2 additions & 1 deletion controllers/consoleplugin/consoleplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
flowslatest "github.com/netobserv/network-observability-operator/apis/flowcollector/v1beta2"
config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config"
"github.com/netobserv/network-observability-operator/controllers/constants"
Expand Down Expand Up @@ -321,7 +322,7 @@ func TestConfigMapContent(t *testing.T) {
assert.Equal(config.Loki.StatusURL, "https://lokistack-query-frontend-http.ls-namespace.svc:3100/")

// frontend params
assert.Equal(config.Frontend.RecordTypes, []string{"flowLog"})
assert.Equal(config.Frontend.RecordTypes, []api.ConnTrackOutputRecordTypeEnum{api.ConnTrackFlowLog})
assert.Empty(config.Frontend.Features)
assert.NotEmpty(config.Frontend.Columns)
assert.NotEmpty(config.Frontend.Filters)
Expand Down
5 changes: 0 additions & 5 deletions controllers/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ const (

TokensPath = "/var/run/secrets/tokens/"

FlowLogType = "flowLog"
NewConnectionType = "newConnection"
HeartbeatType = "heartbeat"
EndConnectionType = "endConnection"

ClusterNameLabelName = "K8S_ClusterName"

MonitoringNamespace = "openshift-monitoring"
Expand Down
2 changes: 1 addition & 1 deletion controllers/flp/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *builder) NewKafkaPipeline() PipelineBuilder {
return b.initPipeline(config.NewKafkaPipeline("kafka-read", api.IngestKafka{
Brokers: []string{b.desired.Kafka.Address},
Topic: b.desired.Kafka.Topic,
GroupId: b.name(), // Without groupid, each message is delivered to each consumers
GroupID: b.name(), // Without groupid, each message is delivered to each consumers
Decoder: decoder,
TLS: getKafkaTLS(&b.desired.Kafka.TLS, "kafka-cert", &b.volumes),
SASL: getKafkaSASL(&b.desired.Kafka.SASL, "kafka-ingest", &b.volumes),
Expand Down
30 changes: 16 additions & 14 deletions controllers/flp/flp_pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@ func (b *PipelineBuilder) AddProcessorStages() error {
// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: api.NetworkTransformRules{{
Type: api.AddKubernetesRuleType,
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
AddZone: addZone,
},
}, {
Type: api.AddKubernetesRuleType,
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
AddZone: addZone,
},
}, {
Type: api.ReinterpretDirectionRuleType,
Type: api.NetworkReinterpretDirection,
}, {
Type: api.OpAddKubernetesInfra,
Type: api.NetworkAddKubernetesInfra,
KubernetesInfra: &api.K8sInfraRule{
Inputs: []string{
"SrcAddr",
Expand Down Expand Up @@ -193,21 +193,21 @@ func (b *PipelineBuilder) AddProcessorStages() error {
func flowMetricToFLP(flowMetric *metricslatest.FlowMetricSpec) (*api.MetricsItem, error) {
m := &api.MetricsItem{
Name: flowMetric.MetricName,
Type: strings.ToLower(string(flowMetric.Type)),
Type: api.MetricEncodeOperationEnum(strings.ToLower(string(flowMetric.Type))),
Filters: []api.MetricsFilter{},
Labels: flowMetric.Labels,
ValueKey: flowMetric.ValueField,
}
for _, f := range flowMetric.Filters {
m.Filters = append(m.Filters, api.MetricsFilter{Key: f.Field, Value: f.Value, Type: conversion.PascalToLower(string(f.MatchType), '_')})
m.Filters = append(m.Filters, api.MetricsFilter{Key: f.Field, Value: f.Value, Type: api.MetricFilterEnum(conversion.PascalToLower(string(f.MatchType), '_'))})
}
if !flowMetric.IncludeDuplicates {
m.Filters = append(m.Filters, api.MetricsFilter{Key: "Duplicate", Value: "true", Type: api.PromFilterNotEqual})
m.Filters = append(m.Filters, api.MetricsFilter{Key: "Duplicate", Value: "true", Type: api.MetricFilterNotEqual})
}
if flowMetric.Direction == metricslatest.Egress {
m.Filters = append(m.Filters, api.MetricsFilter{Key: "FlowDirection", Value: "1|2", Type: api.PromFilterRegex})
m.Filters = append(m.Filters, api.MetricsFilter{Key: "FlowDirection", Value: "1|2", Type: api.MetricFilterRegex})
} else if flowMetric.Direction == metricslatest.Ingress {
m.Filters = append(m.Filters, api.MetricsFilter{Key: "FlowDirection", Value: "0|2", Type: api.PromFilterRegex})
m.Filters = append(m.Filters, api.MetricsFilter{Key: "FlowDirection", Value: "0|2", Type: api.MetricFilterRegex})
}
for _, b := range flowMetric.Buckets {
f, err := strconv.ParseFloat(b, 64)
Expand Down Expand Up @@ -377,9 +377,11 @@ func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderSta
if clusterName != "" {
transformFilterRules = []api.TransformFilterRule{
{
Input: constants.ClusterNameLabelName,
Type: "add_field_if_doesnt_exist",
Value: clusterName,
Type: api.AddFieldIfDoesntExist,
AddFieldIfDoesntExist: &api.TransformFilterGenericRule{
Input: constants.ClusterNameLabelName,
Value: clusterName,
},
},
}
}
Expand Down Expand Up @@ -452,9 +454,9 @@ func getKafkaSASL(sasl *flowslatest.SASLConfig, volumePrefix string, volumes *vo
if !helper.UseSASL(sasl) {
return nil
}
t := "plain"
t := api.SASLPlain
if sasl.Type == flowslatest.SASLScramSHA512 {
t = "scramSHA512"
t = api.SASLScramSHA512
}
idPath := volumes.AddVolume(&sasl.ClientIDReference, volumePrefix+"-sasl-id")
secretPath := volumes.AddVolume(&sasl.ClientSecretReference, volumePrefix+"-sasl-secret")
Expand Down
10 changes: 5 additions & 5 deletions controllers/flp/metrics_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func TestFlowMetricToFLP(t *testing.T) {
Name: "m_1",
Type: "counter",
Filters: []api.MetricsFilter{
{Key: "f", Value: "v", Type: api.PromFilterEqual},
{Key: "Duplicate", Value: "true", Type: api.PromFilterNotEqual},
{Key: "f", Value: "v", Type: api.MetricFilterEqual},
{Key: "Duplicate", Value: "true", Type: api.MetricFilterNotEqual},
},
ValueKey: "val",
Labels: []string{"by_field"},
Expand All @@ -102,9 +102,9 @@ func TestFlowMetricToFLP(t *testing.T) {
Name: "m_2",
Type: "histogram",
Filters: []api.MetricsFilter{
{Key: "f", Value: "v", Type: api.PromFilterRegex},
{Key: "f2", Type: api.PromFilterAbsence},
{Key: "FlowDirection", Value: "1|2", Type: api.PromFilterRegex},
{Key: "f", Value: "v", Type: api.MetricFilterRegex},
{Key: "f2", Type: api.MetricFilterAbsence},
{Key: "FlowDirection", Value: "1|2", Type: api.MetricFilterRegex},
},
Labels: []string{"by_field"},
Buckets: []float64{1, 5, 10, 50, 100},
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.21.7

require (
github.com/go-logr/logr v1.4.1
github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240305083238-24bf8cec8807
github.com/netobserv/flowlogs-pipeline v0.1.12-0.20240322124726-d2b2352bfe0f
github.com/onsi/ginkgo/v2 v2.16.0
github.com/onsi/gomega v1.31.1
github.com/openshift/api v0.0.0-20220112145620-704957ce4980
Expand Down Expand Up @@ -65,10 +65,10 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.17.0 // indirect
Expand Down
Loading

0 comments on commit 229d8ac

Please sign in to comment.