Skip to content

Commit

Permalink
Adding in gigamon input (#633)
Browse files Browse the repository at this point in the history
* Adding in gigamon input

* Adding a http.remote_ip flag to allow override of dockers remote ip handling

* Adding in more pageload values
  • Loading branch information
i3149 authored Nov 18, 2023
1 parent c631ea8 commit 2422118
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 35 deletions.
2 changes: 2 additions & 0 deletions cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func applyFlags(cfg *ktranslate.Config) error {
return
}
cfg.EnableHTTPInput = v
case "http.remote_ip":
cfg.HttpRemoteIp = val
case "enricher":
if _, err := os.Stat(val); err == nil { // If this is a file on disk, run as a script.
cfg.EnricherScript = val
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type Config struct {
TagMapType string
EnableTeeLogs bool
EnableHTTPInput bool
HttpRemoteIp string
EnricherURL string
EnricherSource string
EnricherScript string
Expand Down
82 changes: 47 additions & 35 deletions pkg/formats/nrm/nrm.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,40 +332,52 @@ func (f *NRMFormat) fromSnmpMetadata(in *kt.JCHF) []NRMetric {

var (
synthWLAttr = map[string]bool{
"agent_id": true,
"agent_name": true,
"dst_addr": true,
"dst_cdn_int": true,
"dst_geo": true,
"provider": true,
"src_addr": true,
"src_cdn_int": true,
"src_as_name": true,
"src_geo": true,
"test_id": true,
"test_name": true,
"test_type": true,
"test_url": true,
"src_host": true,
"dst_host": true,
"src_cloud_region": true,
"src_cloud_provider": true,
"src_site": true,
"dst_cloud_region": true,
"dst_cloud_provider": true,
"dst_site": true,
"statusMessage": true,
"statusEncoding": true,
"https_validity": true,
"https_expiry_timestamp": true,
"dest_ip": true,
}

synthAttrKeys = []string{
"statusMessage",
"statusEncoding",
"https_validity",
"https_expiry_timestamp",
"agent_id": true,
"agent_name": true,
"dst_addr": true,
"dst_cdn_int": true,
"dst_geo": true,
"provider": true,
"src_addr": true,
"src_cdn_int": true,
"src_as_name": true,
"src_geo": true,
"test_id": true,
"test_name": true,
"test_type": true,
"test_url": true,
"src_host": true,
"dst_host": true,
"src_cloud_region": true,
"src_cloud_provider": true,
"src_site": true,
"dst_cloud_region": true,
"dst_cloud_provider": true,
"dst_site": true,
"https_validity": true,
"https_expiry_timestamp": true,
"dest_ip": true,
"statusMessage": true,
"statusEncoding": true,
"connectEnd": true,
"connectStart": true,
"decodedBodySize": true,
"domComplete": true,
"domContentLoadedEventEnd": true,
"domContentLoadedEventStart": true,
"domInteractive": true,
"domainLookupEnd": true,
"domainLookupStart": true,
"duration": true,
"fetchStart": true,
"loadEventEnd": true,
"loadEventStart": true,
"redirectCount": true,
"requestStart": true,
"responseEnd": true,
"responseStart": true,
"secureConnectionStart": true,
"tlsProtocol": true,
}
)

Expand Down Expand Up @@ -436,7 +448,7 @@ func (f *NRMFormat) fromKSynth(in *kt.JCHF) []NRMetric {
if len(strData) > 0 {
switch sd := strData[0].(type) {
case map[string]interface{}:
for _, key := range synthAttrKeys {
for key, _ := range synthWLAttr {
if val, ok := sd[key]; ok {
attr[key] = val
}
Expand Down
113 changes: 113 additions & 0 deletions pkg/inputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package http
import (
"compress/gzip"
"context"
"flag"
"fmt"
"net"
"net/http"
"strings"
"time"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/kentik/ktranslate/pkg/eggs/kmux"
"github.com/kentik/ktranslate/pkg/eggs/logger"
"github.com/kentik/ktranslate/pkg/kt"
"github.com/kentik/ktranslate/pkg/util/ic"
)

type KentikHttpListener struct {
Expand All @@ -35,6 +38,14 @@ const (
Listen = "/input"
)

var (
useAsRemoteIP string
)

func init() {
flag.StringVar(&useAsRemoteIP, "http.remote_ip", "", "If set, ignore actual remote IP and use this for device mapping.")
}

func NewHttpListener(ctx context.Context, host string, log logger.Underlying, registry go_metrics.Registry, jchfChan chan []*kt.JCHF, apic *api.KentikApi) (*KentikHttpListener, error) {
ks := KentikHttpListener{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "Http"}, log),
Expand All @@ -55,6 +66,105 @@ func (ks *KentikHttpListener) RegisterRoutes(r *kmux.Router) {
r.HandleFunc(Listen+"/telegraf/standard", ks.wrap(ks.readStandard))
r.HandleFunc(Listen+"/telegraf/batch", ks.wrap(ks.readBatch))
r.HandleFunc(Listen+"/ktranslate/jchf", ks.wrap(ks.readJCHF))
r.HandleFunc(Listen+"/gigamon/batch", ks.wrap(ks.readGigaBatch))
}

type gigaEvent struct {
Timestamp string `json:"ts"`
Vendor string `json:"vendor"`
Version string `json:"version"`
Generator string `json:"generator"`
SrcIP net.IP `json:"src_ip"`
DstIP net.IP `json:"dst_ip"`
Protocol int `json:"protocol,string"`
SrcPort int `json:"src_port,string"`
DstPort int `json:"dst_port,string"`
SslCommonName string `json:"ssl_common_name"`
SslIssuer string `json:"ssl_issuer"`
SslValidityNotBefore string `json:"ssl_validity_not_before"`
SslValidityNotAdter string `json:"ssl_validity_not_after"`
SslExtSigAlgorithumScheme int `json:"ssl_ext_sig_algorithm_scheme,string"`
SslExtSigAlgorithumHash int `json:"ssl_ext_sig_algorithm_hash,string"`
SslExtSigAlgorithumSig int `json:"ssl_ext_sig_algorithm_sig,string"`
AppId int `json:"app_id,string"`
SrcPackets int `json:"src_packets,string"`
DstPackets int `json:"dst_packets,string"`
AppName string `json:"app_name"`
SrcBytes int `json:"src_bytes,string"`
DstBytes int `json:"dst_bytes,string"`
Id string `json:"id"`
SeqNum int `json:"seq_num,string"`
}

func (ks *KentikHttpListener) readGigaBatch(w http.ResponseWriter, r *http.Request) {
var wrap []gigaEvent

// Decode body in gzip format if the request header is set this way.
body := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
z, err := gzip.NewReader(r.Body)
if err != nil {
panic(http.StatusInternalServerError)
}
body = z
}
defer body.Close()

if err := json.NewDecoder(body).Decode(&wrap); err != nil {
ks.Infof("Cannot decode input: %v", err)
panic(http.StatusInternalServerError)
}
w.WriteHeader(http.StatusOK)

remoteIP := getIP(r)
out := make([]*kt.JCHF, len(wrap))
for i, m := range wrap {
out[i] = ks.getGigaJCHF(&m, remoteIP)
}

ks.metrics.Messages.Mark(int64(len(out)))
ks.jchfChan <- out
}

func (ks *KentikHttpListener) getGigaJCHF(event *gigaEvent, remoteIP string) *kt.JCHF {
in := kt.NewJCHF()
in.Timestamp = time.Now().Unix()
in.CustomStr = map[string]string{
"timestamp": event.Timestamp,
"vendor": event.Vendor,
"version": event.Version,
"generator": event.Generator,
"app_name": event.AppName,
"id": event.Id,
"ssl_common_name": event.SslCommonName,
}
in.CustomInt = map[string]int32{
"app_id": int32(event.AppId),
}
in.CustomBigInt = map[string]int64{
"seq_num": int64(event.SeqNum),
}
in.EventType = event.Vendor
in.Provider = kt.ProviderHttpDevice
in.SrcAddr = event.SrcIP.String()
in.DstAddr = event.DstIP.String()
in.Protocol = ic.PROTO_NAMES[uint16(event.Protocol)]
in.L4SrcPort = uint32(event.SrcPort)
in.L4DstPort = uint32(event.DstPort)
in.InPkts = uint64(event.SrcPackets)
in.OutPkts = uint64(event.DstPackets)
in.InBytes = uint64(event.SrcBytes)
in.OutBytes = uint64(event.DstBytes)

if dev, ok := ks.devices[remoteIP]; ok {
in.DeviceName = dev.Name // Copy in any of these info we get
in.DeviceId = dev.ID
in.CompanyId = dev.CompanyID
in.SampleRate = dev.SampleRate
dev.SetUserTags(in.CustomStr)
}

return in
}

type basic struct {
Expand Down Expand Up @@ -219,6 +329,9 @@ type handler func(http.ResponseWriter, *http.Request)
func getIP(r *http.Request) string {
res := r.Header.Get("X-FORWARDED-FOR")
if res == "" {
if useAsRemoteIP != "" {
return useAsRemoteIP
}
res = r.RemoteAddr
}
pts := strings.SplitN(res, ":", 2)
Expand Down

0 comments on commit 2422118

Please sign in to comment.