Skip to content

Commit

Permalink
Update targeted pods on resolution history change
Browse files Browse the repository at this point in the history
  • Loading branch information
corest authored and mertyildiran committed Jan 24, 2024
1 parent 679ef2d commit ecd04e8
Showing 1 changed file with 36 additions and 55 deletions.
91 changes: 36 additions & 55 deletions pkg/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,41 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)

const (
kubesharkConfigmapName = "kubeshark-config-map"
resolverHistoryAnnotation = "resolver.kubeshark.io/history"
)

type Watcher struct {
clientConfig *restclient.Config
clientSet *kubernetes.Clientset
regex *regexp.Regexp
namespaces []string
isStarted bool
errOut chan error
callback func(pods []v1.Pod) error
clientConfig *restclient.Config
clientSet *kubernetes.Clientset
regex *regexp.Regexp
namespaces []string
isStarted bool
lastUpdatedAt string
errOut chan error
callback func(pods []v1.Pod) error
}

func (watcher *Watcher) Start(ctx context.Context, clusterMode bool) {
if !watcher.isStarted {
watcher.isStarted = true

if clusterMode {
go watcher.infiniteErrorHandleRetryFunc(ctx, watcher.watchConfigMap)
go watcher.infiniteErrorHandleRetryFunc(ctx, watcher.watchPods)
go watcher.infiniteErrorHandleRetryFunc(ctx, watcher.watchKubesharkConfigMap)
}
}
}

func (watcher *Watcher) watchConfigMap(ctx context.Context) error {
func (watcher *Watcher) watchKubesharkConfigMap(ctx context.Context) error {
w, err := watcher.clientSet.CoreV1().ConfigMaps(GetSelfNamespace()).Watch(ctx, metav1.ListOptions{
Watch: true,
FieldSelector: fields.OneTermEqualSelector(metav1.ObjectNameField, "kubeshark-config-map").String(),
FieldSelector: fields.OneTermEqualSelector(metav1.ObjectNameField, kubesharkConfigmapName).String(),
})
if err != nil {
return err
Expand All @@ -48,52 +54,16 @@ func (watcher *Watcher) watchConfigMap(ctx context.Context) error {
select {
case event := <-w.ResultChan():
if event.Object == nil {
return errors.New("error in kubectl endpoint watch")
}
watcher.regex, watcher.namespaces = SyncConfig(event.Object.(*v1.ConfigMap))

err = updateCurrentlyTargetedPods(ctx, watcher.clientSet, watcher.regex, watcher.namespaces, watcher.callback)
if err != nil {
log.Error().Err(err).Send()
}
case <-ctx.Done():
w.Stop()
return nil
}
}
}

func (watcher *Watcher) watchPods(ctx context.Context) error {
// empty namespace makes the client watch all namespaces
kubesharkLabels := map[string]string{"app.kubernetes.io/name": "kubeshark"}
w, err := watcher.clientSet.CoreV1().Pods(allNamespaces).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
w, err = watcher.clientSet.CoreV1().Pods(GetSelfNamespace()).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
}
if err != nil {
return err
}
for {
select {
case event := <-w.ResultChan():
if event.Object == nil {
return errors.New("error in kubectl pod watch")
}

pod := event.Object.(*v1.Pod)
if mapsContain(pod.ObjectMeta.Labels, kubesharkLabels) {
// ignore kubeshark pods
continue
return errors.New("error in kubeshark configmap watch")
}
if watcher.isHistoryUpdated(&event) {
watcher.regex, watcher.namespaces = SyncConfig(event.Object.(*v1.ConfigMap))

log.Debug().Str("event", string(event.Type)).Str("pod-name", pod.Name).Str("namespace", pod.Namespace).Send()

err = updateCurrentlyTargetedPods(ctx, watcher.clientSet, watcher.regex, watcher.namespaces, watcher.callback)
if err != nil {
log.Error().Err(err).Send()
err = updateCurrentlyTargetedPods(ctx, watcher.clientSet, watcher.regex, watcher.namespaces, watcher.callback)
if err != nil {
log.Error().Err(err).Send()
}
watcher.lastUpdatedAt = event.Object.(*v1.ConfigMap).ObjectMeta.Annotations[resolverHistoryAnnotation]
}
case <-ctx.Done():
w.Stop()
Expand Down Expand Up @@ -121,3 +91,14 @@ func (watcher *Watcher) infiniteErrorHandleRetryFunc(ctx context.Context, fun fu
}
}
}

func (watcher *Watcher) isHistoryUpdated(event *watch.Event) bool {
cm := event.Object.(*v1.ConfigMap)
currentUpdatedAt := cm.ObjectMeta.Annotations[resolverHistoryAnnotation]

if watcher.lastUpdatedAt != currentUpdatedAt || currentUpdatedAt == "" {
return true
}

return false
}

0 comments on commit ecd04e8

Please sign in to comment.