Skip to content

Commit

Permalink
Fix a bug where detected TCP traffic from pods short time before thei…
Browse files Browse the repository at this point in the history
…r deletion time could be labeled as incoming traffic from the internet
  • Loading branch information
omris94 committed Oct 10, 2024
1 parent 88d4933 commit a65146e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
29 changes: 24 additions & 5 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubefinder
import (
"context"
"fmt"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/otterize/intents-operator/src/shared/errors"
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
"github.com/otterize/network-mapper/src/mapper/pkg/config"
Expand Down Expand Up @@ -37,22 +38,30 @@ type KubeFinder struct {
mgr manager.Manager
client client.Client
serviceIdResolver *serviceidresolver.Resolver
SeenIPsTTLCache *expirable.LRU[string, struct{}]
}

var ErrNoPodFound = errors.NewSentinelError("no pod found")
var ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
var ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
var ErrServiceNotFound = errors.NewSentinelError("service not found")
var (
ErrNoPodFound = errors.NewSentinelError("no pod found")
ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
ErrServiceNotFound = errors.NewSentinelError("service not found")
)

func NewKubeFinder(ctx context.Context, mgr manager.Manager) (*KubeFinder, error) {
indexer := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())}
indexer.initCache()
err := indexer.initIndexes(ctx)
if err != nil {
return nil, errors.Wrap(err)
}
return indexer, nil
}

func (k *KubeFinder) initCache() {
k.SeenIPsTTLCache = expirable.NewLRU[string, struct{}](2000, nil, time.Minute*10)
}

func (k *KubeFinder) initIndexes(ctx context.Context) error {
err := k.mgr.GetCache().IndexField(ctx, &corev1.Pod{}, podIPIndexField, func(object client.Object) []string {
res := make([]string, 0)
Expand Down Expand Up @@ -80,6 +89,7 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
}

for _, ip := range pod.Status.PodIPs {
k.SeenIPsTTLCache.Add(ip.IP, struct{}{})
res = append(res, ip.IP)
}
return res
Expand Down Expand Up @@ -464,7 +474,12 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
}

func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) {
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted.
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR

wasPodIp := k.WasPodIP(ip)
if wasPodIp {
return true, nil
}

isNode, err := k.IsNodeIP(ctx, ip)
if err != nil {
Expand Down Expand Up @@ -502,6 +517,10 @@ func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
return len(pods.Items) > 0, nil
}

func (k *KubeFinder) WasPodIP(ip string) bool {
return k.SeenIPsTTLCache.Contains(ip)
}

func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
Expand Down
37 changes: 37 additions & 0 deletions src/mapper/pkg/kubefinder/kubefinder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() {
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))
}

func (s *KubeFinderTestSuite) TestIsSrcIpClusterInternal() {
pod := s.AddPod("test-pod", "1.1.1.1", nil, nil)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

// Check with existing pod's ip
isInternal, err := s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().True(isInternal)

// Check with non-existing pod's ip
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "8.8.8.8")
s.Require().NoError(err)
s.Require().False(isInternal)

err = s.Mgr.GetClient().Delete(context.Background(), pod)
s.Require().NoError(err)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

// Check pod doesn't exist in the manager's cache
pod, err = s.kubeFinder.ResolveIPToPod(context.Background(), "1.1.1.1")
s.Require().Nil(pod)
s.Require().Error(err)

// Check isInternal with the deleted pod's ip
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().True(isInternal)

// Reset the cache
s.kubeFinder.initCache()

// Check isInternal with the deleted pod's ip after cache reset
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().False(isInternal)
}

func TestKubeFinderTestSuite(t *testing.T) {
suite.Run(t, new(KubeFinderTestSuite))
}

0 comments on commit a65146e

Please sign in to comment.