Skip to content

Commit

Permalink
Fix flow ingest (#647)
Browse files Browse the repository at this point in the history
* upgrade to 2.1.0 flow

* Fixing sampling bug, adding auto flow detection
  • Loading branch information
i3149 authored Jan 4, 2024
1 parent d86b1ed commit d29731c
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 51 deletions.
13 changes: 12 additions & 1 deletion cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func init() {
flag.StringVar(&sslKeyFile, "ssl_key_file", "", "SSL Key file to use for serving HTTPS traffic")
flag.StringVar(&tagMapType, "tag_map_type", "", "type of mapping to use for tag values. file|null")
flag.StringVar(&vpcSource, "vpc", kt.LookupEnvString("KENTIK_VPC", ""), "Run VPC Flow Ingest")
flag.StringVar(&flowSource, "nf.source", "", "Run NetFlow Ingest Directly. Valid values here are netflow5|netflow9|ipfix|sflow|nbar|asa|pan")
flag.StringVar(&flowSource, "nf.source", "", "Run NetFlow Ingest Directly. Valid values here are netflow5|netflow9|ipfix|sflow|nbar|asa|pan|auto")
flag.BoolVar(&teeLog, "tee_logs", false, "Tee log messages to sink")
flag.StringVar(&appMap, "application_map", "", "File containing custom application mappings")
flag.StringVar(&syslog, "syslog.source", "", "Run Syslog Server at this IP:Port or unix socket.")
Expand Down Expand Up @@ -209,6 +209,10 @@ func applyMode(cfg *ktranslate.Config, mode string) error {
cfg.Rollup.Formats = append(cfg.Rollup.Formats, "s_sum,pkts.rcv,in_pkts+out_pkts,device_name,dst_addr,custom_str.dst_as_name,dst_geo,l4_dst_port,protocol")
case "nr1.flow":
cfg.SNMPInput.FlowOnly = true
cfg.FlowInput.Enable = true
if cfg.FlowInput.Protocol == "" {
cfg.FlowInput.Protocol = "auto"
}
setNr()
case "nr1.discovery":
cfg.EnableSNMPDiscovery = true
Expand Down Expand Up @@ -732,6 +736,13 @@ func applyFlags(cfg *ktranslate.Config) error {
return
}
cfg.FlowInput.Workers = v
case "nf.queuesize":
v, err := strconv.Atoi(val)
if err != nil {
errCh <- err
return
}
cfg.FlowInput.QueueSize = v
case "nf.message.fields":
cfg.FlowInput.MessageFields = val
case "nf.prom.listen":
Expand Down
6 changes: 4 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type FlowInputConfig struct {
ListenPort int
EnableReusePort bool
Workers int
QueueSize int
MessageFields string
PrometheusListenAddr string
MappingFile string
Expand Down Expand Up @@ -479,11 +480,12 @@ func DefaultConfig() *Config {
},
FlowInput: &FlowInputConfig{
Enable: false,
Protocol: "netflow5",
Protocol: "",
ListenIP: "0.0.0.0",
ListenPort: 9995,
EnableReusePort: false,
Workers: 1,
Workers: 2,
QueueSize: 10000,
MessageFields: FlowDefaultFields,
PrometheusListenAddr: "",
MappingFile: "",
Expand Down
24 changes: 12 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ require (
github.com/liamg/furious v0.0.0-20191231090757-c295c872d6c1
github.com/linkedin/goavro/v2 v2.10.1
github.com/montanaflynn/stats v0.7.0
github.com/netsampler/goflow2/v2 v2.0.0
github.com/oschwald/geoip2-golang v1.8.0
github.com/netsampler/goflow2/v2 v2.1.0
github.com/oschwald/geoip2-golang v1.9.0
github.com/pkg/errors v0.9.1
github.com/prometheus-community/pro-bing v0.3.0
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/prometheus v0.46.0
github.com/sasha-s/go-hll v0.0.0-20180522065212-c6eb27aee351
github.com/segmentio/kafka-go v0.4.23
Expand All @@ -46,7 +46,7 @@ require (
go.starlark.net v0.0.0-20220926145019-14b050677505
google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.31.0
google.golang.org/protobuf v1.32.0
gopkg.in/mcuadros/go-syslog.v2 v2.3.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -95,9 +95,9 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -106,15 +106,15 @@ require (
github.com/onsi/ginkgo v1.16.2 // indirect
github.com/onsi/gomega v1.13.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/oschwald/maxminddb-golang v1.10.0 // indirect
github.com/oschwald/maxminddb-golang v1.11.0 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec // indirect
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
Expand All @@ -126,7 +126,7 @@ require (
go.opentelemetry.io/otel/trace v1.17.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
48 changes: 24 additions & 24 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/liamg/furious v0.0.0-20191231090757-c295c872d6c1 h1:lUcVBp3HQU0lr5V4RAD+B8WimUE13uYxYfKvqBLHFFo=
github.com/liamg/furious v0.0.0-20191231090757-c295c872d6c1/go.mod h1:C3GdQ5NP2EDGxKNM6+ht7DnpsOu8qTNLGftjp5GOt2o=
github.com/libp2p/go-reuseport v0.2.0 h1:18PRvIMlpY6ZK85nIAicSBuXXvrYoSw3dsBAR7zc560=
github.com/libp2p/go-reuseport v0.2.0/go.mod h1:bvVho6eLMm6Bz5hmU0LYN3ixd3nPPvtIlaURZZgOY4k=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/linkedin/goavro/v2 v2.10.1 h1:ExVurHDnf0eyUocILs48kiZ4pGvaEbDvBOQcfLruA/0=
github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand All @@ -309,8 +309,8 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand All @@ -327,8 +327,8 @@ github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY
github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mostlygeek/arp v0.0.0-20170424181311-541a2129847a h1:AfneHvfmYgUIcgdUrrDFklLdEzQAvG9AKRTe1x1mx/0=
github.com/mostlygeek/arp v0.0.0-20170424181311-541a2129847a/go.mod h1:jZxafo9CAqaKFQE4zitrg5QNlA6CXUsjwXPlIppF3tk=
github.com/netsampler/goflow2/v2 v2.0.0 h1:Eued7ORLFI2q2+b8cMBcNeEkKOXHtly6KNO8L492aoM=
github.com/netsampler/goflow2/v2 v2.0.0/go.mod h1:NsdiueJy8F5PXg0tHVYw9p0rvsryh8TFIxw7IR6Jjys=
github.com/netsampler/goflow2/v2 v2.1.0 h1:A22X31O5/NlUf9135tTsAjWHC6RxUauZ74ngeBUlLX4=
github.com/netsampler/goflow2/v2 v2.1.0/go.mod h1:mDkDLl+uSFLq7aRuQ113+ZAJN0HdzCx/Dgf0wCmr+Cc=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand All @@ -347,10 +347,10 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/oschwald/geoip2-golang v1.8.0 h1:KfjYB8ojCEn/QLqsDU0AzrJ3R5Qa9vFlx3z6SLNcKTs=
github.com/oschwald/geoip2-golang v1.8.0/go.mod h1:R7bRvYjOeaoenAp9sKRS8GX5bJWcZ0laWO5+DauEktw=
github.com/oschwald/maxminddb-golang v1.10.0 h1:Xp1u0ZhqkSuopaKmk1WwHtjF0H9Hd9181uj2MQ5Vndg=
github.com/oschwald/maxminddb-golang v1.10.0/go.mod h1:Y2ELenReaLAZ0b400URyGwvYxHV1dLIxBuyOsyYjHK0=
github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc=
github.com/oschwald/geoip2-golang v1.9.0/go.mod h1:BHK6TvDyATVQhKNbQBdrj9eAvuwOMi2zSFXizL3K81Y=
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
Expand All @@ -371,15 +371,15 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus-community/pro-bing v0.3.0 h1:SFT6gHqXwbItEDJhTkzPWVqU6CLEtqEfNAPp47RUON4=
github.com/prometheus-community/pro-bing v0.3.0/go.mod h1:p9dLb9zdmv+eLxWfCT6jESWuDrS+YzpPkQBgysQF8a0=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.46.0 h1:9JSdXnsuT6YsbODEhSQMwxNkGwPExfmzqG73vCMk/Kw=
github.com/prometheus/prometheus v0.46.0/go.mod h1:10L5IJE5CEsjee1FnOcVswYXlPIscDWWt3IJ2UDYrz4=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand All @@ -397,8 +397,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
Expand Down Expand Up @@ -511,8 +511,8 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/oauth2 v0.12.0 h1:smVPGxink+n1ZI5pkQa8y6fZT0RW0MgCO5bFpepy4B4=
golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -628,8 +628,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
28 changes: 24 additions & 4 deletions pkg/inputs/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/netsampler/goflow2/v2/format"
"github.com/netsampler/goflow2/v2/metrics"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
"github.com/netsampler/goflow2/v2/transport"
"github.com/netsampler/goflow2/v2/utils"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/yaml.v2"
Expand All @@ -39,13 +40,15 @@ const (
PAN = "pan"
JFlow = "jflow"
CFlow = "cflow"
Auto = "auto"
)

var (
addr string
port int
reuse bool
workers int
queueSize int
fields string
promListen string
mappingFile string
Expand All @@ -55,7 +58,8 @@ func init() {
flag.StringVar(&addr, "nf.addr", "0.0.0.0", "Sflow/NetFlow/IPFIX listening address")
flag.IntVar(&port, "nf.port", 9995, "Sflow/NetFlow/IPFIX listening port")
flag.BoolVar(&reuse, "nf.reuserport", false, "Enable so_reuseport for Sflow/NetFlow/IPFIX")
flag.IntVar(&workers, "nf.workers", 1, "Number of workers per flow collector")
flag.IntVar(&workers, "nf.workers", 2, "Number of workers per flow collector")
flag.IntVar(&queueSize, "nf.queuesize", 10000, "How big of a queue to hold for incomming flow packets.")
flag.StringVar(&fields, "nf.message.fields", ktranslate.FlowDefaultFields, "The list of fields to include in flow messages. Can be any of "+ktranslate.FlowFields)
flag.StringVar(&promListen, "nf.prom.listen", "", "Run a promethues metrics collector here")
flag.StringVar(&mappingFile, "nf.mapping", "", "Configuration file for custom netflow mappings")
Expand Down Expand Up @@ -112,8 +116,9 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log

flowProducer = metrics.WrapPromProducer(flowProducer)
udpCfg := &utils.UDPReceiverConfig{
Sockets: cfg.Workers,
Workers: cfg.Workers,
Sockets: cfg.Workers,
Workers: cfg.Workers,
QueueSize: cfg.QueueSize,
}
recv, err := utils.NewUDPReceiver(udpCfg)
if err != nil {
Expand All @@ -126,9 +131,15 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log
return nil, err
}

transport.RegisterTransportDriver("chf", kt)
transporter, err := transport.FindTransport("chf")
if err != nil {
return nil, err
}

cfgPipe := &utils.PipeConfig{
Format: formatter,
Transport: nil,
Transport: transporter,
Producer: flowProducer,
NetFlowTemplater: metrics.NewDefaultPromTemplateSystem, // wrap template system to get Prometheus info
}
Expand All @@ -155,6 +166,9 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log
}
kt.pipe = utils.NewSFlowPipe(cfgPipe)
decodeFunc = metrics.PromDecoderWrapper(kt.pipe.DecodeFlow, string(proto))
case Auto:
kt.pipe = utils.NewFlowPipe(cfgPipe)
decodeFunc = metrics.PromDecoderWrapper(kt.pipe.DecodeFlow, string(proto))
default:
return nil, fmt.Errorf("Unknown flow format %v", proto)
}
Expand Down Expand Up @@ -195,3 +209,9 @@ func loadMapping(f io.Reader) (*protoproducer.ProducerConfig, error) {
err := dec.Decode(config)
return config, err
}

func doSample() protoproducer.SamplingRateSystem {
return &protoproducer.SingleSamplingRateSystem{
Sampling: 1,
}
}
Loading

0 comments on commit d29731c

Please sign in to comment.