Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

Commit

Permalink
feat: Use cloudevent for alerting endpoint (#270)
Browse files Browse the repository at this point in the history
* feat: Use cloudevent for alerting endpoint

Signed-off-by: TannerGabriel <[email protected]>

* Use NewHTTPReceiveHandler to handle cloudevents and Prometheus alerts

Signed-off-by: TannerGabriel <[email protected]>
  • Loading branch information
TannerGabriel authored Feb 21, 2022
1 parent 827a4cc commit 4d93045
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 74 deletions.
2 changes: 2 additions & 0 deletions chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ spec:
value: 'sh.keptn.event.monitoring.configure,sh.keptn.event.configure-monitoring.triggered,sh.keptn.event.get-sli.triggered'
- name: PUBSUB_RECIPIENT
value: '127.0.0.1'
- name: PUBSUB_RECIPIENT_PATH
value: '/events'
- name: STAGE_FILTER
value: "{{ .Values.distributor.stageFilter }}"
- name: PROJECT_FILTER
Expand Down
121 changes: 49 additions & 72 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/keptn-contrib/prometheus-service/eventhandling"
"github.com/keptn-contrib/prometheus-service/utils"
keptn "github.com/keptn/go-utils/pkg/lib"
Expand All @@ -14,27 +14,16 @@ import (
"os"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
keptncommon "github.com/keptn/go-utils/pkg/lib/keptn"
keptnv2 "github.com/keptn/go-utils/pkg/lib/v0_2_0"
)

type ceTest struct {
Specversion string `json:"specversion" yaml:"specversion"`
}

var (
env utils.EnvConfig
)

func main() {
/**
Note that prometheus-service requires to open multiple ports:
* 8080 (default port; exposed) - acts as the ingest for prometheus alerts, and also proxies CloudEvents to port 8082
* 8081 (Keptn distributor) - Port that keptn/distributor is listening too (default port for Keptn distributor)
* 8082 (CloudEvents; env.Port) - Port that the CloudEvents sdk is listening to; this port is not exposed, but will be used for internal communication
*/
logger := keptncommon.NewLogger("", "", utils.ServiceName)

env = utils.EnvConfig{}
Expand All @@ -46,12 +35,6 @@ func main() {
logger.Debug(fmt.Sprintf("Configuration service: %s", env.ConfigurationServiceURL))
logger.Debug(fmt.Sprintf("Port: %d, Path: %s", env.Port, env.Path))

// listen on port 8080 for any HTTP request (cloudevents are also handled, but forwarded to env.Port internally)
logger.Debug("Starting server on port 8080...")
http.HandleFunc("/", Handler)
http.HandleFunc("/health", HealthHandler)
go http.ListenAndServe(":8080", nil)

// start internal CloudEvents handler (on port env.Port)
os.Exit(_main(env))
}
Expand All @@ -60,17 +43,19 @@ func _main(env utils.EnvConfig) int {
ctx := context.Background()
ctx = cloudevents.WithEncodingStructured(ctx)

p, err := cloudevents.NewHTTP(cloudevents.WithPath(env.Path), cloudevents.WithPort(env.Port))
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create client, %v", err)
log.Fatalf("failed to create protocol: %s", err.Error())
}
// Create CloudEvents client
c, err := cloudevents.NewClient(p)

ceHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, gotEvent)
if err != nil {
log.Fatalf("failed to create client, %v", err)
log.Fatalf("failed to create handler: %s", err.Error())
}
// Start CloudEvents receiver
log.Fatal(c.StartReceiver(ctx, gotEvent))

http.HandleFunc("/", HTTPGetHandler)
http.Handle(env.Path, ceHandler)
http.ListenAndServe(":8080", nil)

return 0
}
Expand All @@ -80,6 +65,8 @@ func gotEvent(event cloudevents.Event) error {
var shkeptncontext string
_ = event.Context.ExtensionAs("shkeptncontext", &shkeptncontext)

logger := keptncommon.NewLogger(shkeptncontext, "", utils.ServiceName)

// convert v0.1.4 spec monitoring.configure CloudEvent into a v0.2.0 spec configure-monitoring.triggered CloudEvent
if event.Type() == keptn.ConfigureMonitoringEventType {
event.SetType(keptnv2.GetTriggeredEventType(keptnv2.ConfigureMonitoringTaskName))
Expand All @@ -91,13 +78,34 @@ func gotEvent(event cloudevents.Event) error {
return fmt.Errorf("could not create Keptn handler: %v", err)
}

logger := keptncommon.NewLogger(shkeptncontext, event.Context.GetID(), utils.ServiceName)

return eventhandling.NewEventHandler(event, logger, keptnHandler).HandleEvent()
}

// HealthHandler provides a basic health check
func HealthHandler(w http.ResponseWriter, r *http.Request) {
// HTTPGetHandler will handle all requests for '/health' and '/ready'
func HTTPGetHandler(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
shkeptncontext := uuid.New().String()
logger := keptncommon.NewLogger(shkeptncontext, "", utils.ServiceName)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error(fmt.Sprintf("Failed to read body from requst: %s", err))
return
}

eventhandling.ProcessAndForwardAlertEvent(w, body, logger, shkeptncontext)
case "/health":
healthEndpointHandler(w, r)
case "/ready":
healthEndpointHandler(w, r)
default:
endpointNotFoundHandler(w, r)
}
}

// HealthHandler rerts a basic health check back
func healthEndpointHandler(w http.ResponseWriter, r *http.Request) {
type StatusBody struct {
Status string `json:"status"`
}
Expand All @@ -114,51 +122,20 @@ func HealthHandler(w http.ResponseWriter, r *http.Request) {
}
}

// Handler takes all http request and forwards it to the corresponding event handler (e.g., prometheus alert);
// Note: cloudevents are also forwarded
func Handler(rw http.ResponseWriter, req *http.Request) {
shkeptncontext := uuid.New().String()
logger := keptncommon.NewLogger(shkeptncontext, "", utils.ServiceName)
logger.Debug(fmt.Sprintf("%s %s", req.Method, req.URL))
logger.Debug("Receiving event which will be dispatched")

body, err := ioutil.ReadAll(req.Body)
if err != nil {
logger.Error(fmt.Sprintf("Failed to read body from requst: %s", err))
return
// endpointNotFoundHandler will return 404 for requests
func endpointNotFoundHandler(w http.ResponseWriter, r *http.Request) {
type StatusBody struct {
Status string `json:"status"`
}

// try to deserialize the event to check if it contains specversion
event := ceTest{}
if err = json.Unmarshal(body, &event); err != nil {
logger.Debug("Failed to read body: " + err.Error() + "; content=" + string(body))
return
}
status := StatusBody{Status: "NOT FOUND"}

// check event whether event contains specversion to forward it to 8081; otherwise process it as prometheus alert
if event.Specversion == "" {
// this is a prometheus alert
eventhandling.ProcessAndForwardAlertEvent(rw, body, logger, shkeptncontext)
} else {
// this is a CloudEvent retrieved on port 8080 that needs to be forwarded to 8082 (env.Port)
forwardPath := fmt.Sprintf("http://localhost:%d%s", env.Port, env.Path)
logger.Debug("Forwarding cloudevent to " + forwardPath)
// forward cloudevent to cloudevents lister on env.Port (see main())
proxyReq, err := http.NewRequest(req.Method, forwardPath, bytes.NewReader(body))
proxyReq.Header.Set("Content-Type", "application/cloudevents+json")
resp, err := http.DefaultClient.Do(proxyReq)
if err != nil {
logger.Error("Could not send cloud event: " + err.Error())
return
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode > 299 {
logger.Error(fmt.Sprintf("Could not process cloud event: Handler returned status %s", resp.Status))
rw.WriteHeader(500)
} else {
logger.Debug("event successfully sent to port 8081")
rw.WriteHeader(201)
}
body, _ := json.Marshal(status)

w.Header().Set("content-type", "application/json")

_, err := w.Write(body)
if err != nil {
log.Println(err)
}
}
4 changes: 2 additions & 2 deletions utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ const SliResourceURI = "prometheus/sli.yaml"
// EnvConfig holds the configuration of environment variables that this service uses
type EnvConfig struct {
// Port on which to listen for cloudevents
Port int `envconfig:"RCV_PORT" default:"8082"` // Note: must not be 8080 and not 8081
Path string `envconfig:"RCV_PATH" default:"/"`
Port int `envconfig:"RCV_PORT" default:"8080"`
Path string `envconfig:"RCV_PATH" default:"/events"`
ConfigurationServiceURL string `envconfig:"CONFIGURATION_SERVICE" default:""`
PrometheusNamespace string `envconfig:"PROMETHEUS_NS" default:""`
PrometheusConfigMap string `envconfig:"PROMETHEUS_CM" default:""`
Expand Down

0 comments on commit 4d93045

Please sign in to comment.