Skip to content

Commit

Permalink
feat: Add metrics endpoint (#169)
Browse files Browse the repository at this point in the history
* add prom client and test metric

* lint issues

* add metrics. improve error handling

* use errors for kcp request

* gofmt

* add timeout to metrics endpoint

* lint issues

* lint issues

* add kcp req total inc

* add reason for kcp request failure
  • Loading branch information
lindnerby authored Dec 6, 2023
1 parent 7fca647 commit fd5d1f4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 76 deletions.
6 changes: 3 additions & 3 deletions runtime-watcher/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/kyma-project/runtime-watcher/listener v0.0.0-20231011102033-b8383d92883e
github.com/onsi/ginkgo/v2 v2.13.2
github.com/onsi/gomega v1.30.0
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
Expand Down Expand Up @@ -43,10 +44,9 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions runtime-watcher/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand Down
134 changes: 80 additions & 54 deletions runtime-watcher/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"github.com/go-logr/logr"
listenerTypes "github.com/kyma-project/runtime-watcher/listener/pkg/types"
"github.com/kyma-project/runtime-watcher/skr/internal/parser"
"github.com/kyma-project/runtime-watcher/skr/internal/requestparser"
"github.com/kyma-project/runtime-watcher/skr/internal/serverconfig"
"github.com/kyma-project/runtime-watcher/skr/internal/watchermetrics"
)

const (
HTTPSClientTimeout = time.Minute * 3
HTTPTimeout = time.Minute * 3
eventEndpoint = "event"
admissionError = "admission error"
kcpReqFailedMsg = "kcp request failed"
Expand All @@ -42,31 +43,37 @@ const (
func NewHandler(client client.Client,
logger logr.Logger,
config serverconfig.ServerConfig,
parser parser.RequestParser,
parser requestparser.RequestParser,
metrics watchermetrics.WatcherMetrics,
) *Handler {
return &Handler{
client: client,
logger: logger,
config: config,
requestParser: parser,
metrics: metrics,
}
}

type Handler struct {
client client.Client
logger logr.Logger
config serverconfig.ServerConfig
requestParser parser.RequestParser
requestParser requestparser.RequestParser
metrics watchermetrics.WatcherMetrics
}

type responseInterface interface {
IsEmpty() bool
}

func (h *Handler) Handle(writer http.ResponseWriter, request *http.Request) {
h.metrics.UpdateAdmissionRequestsTotal()
start := time.Now()
admissionReview, err := h.requestParser.ParseAdmissionReview(request)
if err != nil {
h.logger.Error(errors.Join(errAdmission, err), "failed to parse AdmissionReview")
h.metrics.UpdateAdmissionRequestsErrorTotal()
return
}

Expand All @@ -79,8 +86,7 @@ func (h *Handler) Handle(writer http.ResponseWriter, request *http.Request) {
}

validationMsg := h.validateResources(admissionReview.Request, moduleName)

h.logger.Info(string(validationMsg))
h.logger.Info(validationMsg)

responseBytes := h.prepareResponse(admissionReview, validationMsg)
if responseBytes == nil {
Expand All @@ -91,6 +97,9 @@ func (h *Handler) Handle(writer http.ResponseWriter, request *http.Request) {
h.logger.Error(err, admissionError)
return
}

duration := time.Since(start)
h.metrics.UpdateRequestDuration(duration)
}

func getModuleName(urlPath string) (string, error) {
Expand All @@ -108,7 +117,7 @@ func getModuleName(urlPath string) (string, error) {
}

func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
validationMessage admissionMessage,
validationMessage string,
) []byte {
h.logger.Info(fmt.Sprintf("Preparing response for AdmissionReview: %s %s %s",
admissionReview.Request.Kind.Kind,
Expand All @@ -124,7 +133,7 @@ func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
UID: admissionReview.Request.UID,
Allowed: true,
Result: &metav1.Status{
Message: string(validationMessage),
Message: validationMessage,
Status: metav1.StatusSuccess,
},
},
Expand All @@ -138,10 +147,8 @@ func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
return admissionReviewBytes
}

type admissionMessage string

func (h *Handler) validateResources(request *admissionv1.AdmissionRequest, moduleName string,
) admissionMessage {
) string {
object, oldObject := WatchedObject{}, WatchedObject{}

switch request.Operation {
Expand All @@ -152,24 +159,41 @@ func (h *Handler) validateResources(request *admissionv1.AdmissionRequest, modul
GroupVersionKind: request.Kind,
SubResource: request.SubResource,
}
msg := h.sendRequestToKcpOnUpdate(resource, oldObject, object, moduleName)
return admissionMessage(msg)
changed, err := h.checkForChange(resource, oldObject, object)
if err != nil {
h.metrics.UpdateFailedKCPTotal(watchermetrics.ReasonSubresource)
return err.Error()
}
if !changed {
return fmt.Sprintf("no change detected on watched resource %s/%s",
object.Namespace, object.Name)
}
err = h.sendRequestToKcp(moduleName, object)
if err != nil {
return err.Error()
}
case admissionv1.Delete:
h.unmarshalWatchedObject(request.OldObject.Raw, &oldObject)
msg := h.sendRequestToKcp(moduleName, oldObject)
return admissionMessage(msg)
err := h.sendRequestToKcp(moduleName, oldObject)
if err != nil {
return err.Error()
}
case admissionv1.Create:
h.unmarshalWatchedObject(request.Object.Raw, &object)
msg := h.sendRequestToKcp(moduleName, object)
return admissionMessage(msg)
err := h.sendRequestToKcp(moduleName, object)
if err != nil {
return err.Error()
}
case admissionv1.Connect:
msg := fmt.Sprintf("operation %s not supported for %s", admissionv1.Connect, request.Kind.String())
return admissionMessage(msg)
return fmt.Sprintf("operation %s not supported for %s", admissionv1.Connect, request.Kind.String())
}
return ""
return kcpReqSucceededMsg
}

var errAdmission = errors.New(admissionError)
var (
errAdmission = errors.New(admissionError)
errKcpRequest = errors.New(kcpReqFailedMsg)
)

func (h *Handler) unmarshalWatchedObject(rawBytes []byte, response responseInterface) {
if err := json.Unmarshal(rawBytes, response); err != nil {
Expand All @@ -180,9 +204,7 @@ func (h *Handler) unmarshalWatchedObject(rawBytes []byte, response responseInter
}
}

func (h *Handler) sendRequestToKcpOnUpdate(resource *Resource, oldObj, obj WatchedObject,
moduleName string,
) string {
func (h *Handler) checkForChange(resource *Resource, oldObj, obj WatchedObject) (bool, error) {
var registerChange bool
// e.g. slice or status subresource. Only status is supported.
watchedSubResource := strings.ToLower(resource.SubResource)
Expand All @@ -200,69 +222,73 @@ func (h *Handler) sendRequestToKcpOnUpdate(resource *Resource, oldObj, obj Watch
case statusSubResource:
registerChange = !reflect.DeepEqual(oldObj.Status, obj.Status)
default:
return fmt.Sprintf("invalid subresource for watched resource %s/%s",
obj.Namespace, obj.Name)
}

if !registerChange {
return fmt.Sprintf("no change detected on watched resource %s/%s",
return false, fmt.Errorf("invalid subresource for watched resource %s/%s",
obj.Namespace, obj.Name)
}

return h.sendRequestToKcp(moduleName, obj)
return registerChange, nil
}

func (h *Handler) sendRequestToKcp(moduleName string, watched WatchedObject) string {
func (h *Handler) sendRequestToKcp(moduleName string, watched WatchedObject) error {
h.metrics.UpdateKCPTotal()

owner, err := extractOwner(watched)
if err != nil {
h.logger.Error(err, "resource owner name could not be determined")
return kcpReqFailedMsg
return h.logAndReturnKCPErr(err, watchermetrics.ReasonOwner)
}

watcherEvent := &listenerTypes.WatchEvent{
Owner: client.ObjectKey{Namespace: owner.Namespace, Name: owner.Name},
Watched: client.ObjectKey{Namespace: watched.Namespace, Name: watched.Name},
WatchedGvk: metav1.GroupVersionKind(schema.FromAPIVersionAndKind(watched.APIVersion, watched.Kind)),
}
postBody, err := json.Marshal(watcherEvent)
if err != nil {
h.logger.Error(err, kcpReqFailedMsg)
return kcpReqFailedMsg
}

requestPayload := bytes.NewBuffer(postBody)

if h.config.KCPAddress == "" || h.config.KCPContract == "" {
return kcpReqFailedMsg
return h.logAndReturnKCPErr(errors.New("KCPAddress or KCPContract empty"), watchermetrics.ReasonKcpAddress)
}

url := fmt.Sprintf("https://%s/%s/%s/%s", h.config.KCPAddress, h.config.KCPContract, moduleName, eventEndpoint)
httpsClient, err := h.getHTTPSClient()
if err != nil {
h.logger.Error(err, kcpReqFailedMsg)
return err.Error()
return h.logAndReturnKCPErr(err, watchermetrics.ReasonRequest)
}

resp, err := httpsClient.Post(url, "application/json", requestPayload)
postBody, err := json.Marshal(watcherEvent)
if err != nil {
return h.logAndReturnKCPErr(err, watchermetrics.ReasonRequest)
}
resp, err := httpsClient.Post(url, "application/json", bytes.NewBuffer(postBody))
if err != nil {
h.logger.Error(err, kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
err = errors.Join(errKcpRequest, err)
h.logger.Error(err, err.Error(), "postBody", watcherEvent)
h.metrics.UpdateFailedKCPTotal(watchermetrics.ReasonResponse)
return err
}
defer resp.Body.Close()
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
h.logger.Error(err, kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
err = errors.Join(errKcpRequest, err)
h.logger.Error(err, err.Error(), "postBody", watcherEvent)
h.metrics.UpdateFailedKCPTotal(watchermetrics.ReasonResponse)
return err
}
if resp.StatusCode != http.StatusOK {
h.logger.Error(fmt.Errorf("%w: responseBody: %s with StatusCode: %d", err, responseBody, resp.StatusCode),
kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
err = fmt.Errorf("%w: responseBody: %s with StatusCode: %d", errKcpRequest, responseBody, resp.StatusCode)
h.logger.Error(err, err.Error(), "postBody", watcherEvent)
h.metrics.UpdateFailedKCPTotal(watchermetrics.ReasonResponse)
return err
}

h.logger.Info(fmt.Sprintf("sent request to KCP successfully for resource %s/%s",
watched.Namespace, watched.Name), "postBody", watcherEvent)
return kcpReqSucceededMsg
return nil
}

func (h *Handler) logAndReturnKCPErr(err error, reason watchermetrics.KcpErrReason) error {
err = errors.Join(errKcpRequest, err)
h.logger.Error(err, err.Error())
h.metrics.UpdateFailedKCPTotal(reason)
return err
}

func extractOwner(watched WatchedObject) (types.NamespacedName, error) {
Expand Down Expand Up @@ -302,7 +328,7 @@ func (h *Handler) getHTTPSClient() (*http.Client, error) {
rootCertPool := x509.NewCertPool()
rootCertPool.AddCert(rootPubCrt)

httpsClient.Timeout = HTTPSClientTimeout
httpsClient.Timeout = HTTPTimeout
//nolint:gosec
httpsClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
Expand Down
10 changes: 7 additions & 3 deletions runtime-watcher/internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"net/http"
"net/http/httptest"

"github.com/kyma-project/runtime-watcher/skr/internal/parser"
"github.com/kyma-project/runtime-watcher/skr/internal/watchermetrics"

"github.com/kyma-project/runtime-watcher/skr/internal/requestparser"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -105,9 +107,11 @@ var _ = Describe("given watched resource", Ordered, func() {
testCase.params.moduleName, managedByLabel, ownedByAnnotation, testCase.params.changeObjType)
Expect(err).ShouldNot(HaveOccurred())

decoder := serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
requestParser := requestparser.NewRequestParser(decoder)
metrics := watchermetrics.NewMetrics()
handler := internal.NewHandler(k8sClient, logger, config, *requestParser, *metrics)
skrRecorder := httptest.NewRecorder()
requestParser := parser.NewRequestParser(serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer())
handler := internal.NewHandler(k8sClient, logger, config, *requestParser)
handler.Handle(skrRecorder, request)

bytes, err := io.ReadAll(skrRecorder.Body)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package parser
package requestparser

import (
"errors"
Expand Down
Loading

0 comments on commit fd5d1f4

Please sign in to comment.