Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug where detected TCP traffic from pods short time before their deletion time could be labeled as incoming traffic from the internet #245

Merged
merged 4 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubefinder
import (
"context"
"fmt"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/otterize/intents-operator/src/shared/errors"
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
"github.com/otterize/network-mapper/src/mapper/pkg/config"
Expand Down Expand Up @@ -31,26 +32,36 @@ const (
IstioCanonicalNameLabelKey = "service.istio.io/canonical-name"
apiServerName = "kubernetes"
apiServerNamespace = "default"
seenIPsCacheSize = 2000
seenIPsCacheTTL = time.Minute * 10
)

type KubeFinder struct {
mgr manager.Manager
client client.Client
serviceIdResolver *serviceidresolver.Resolver
seenIPsTTLCache *expirable.LRU[string, struct{}]
}

var ErrNoPodFound = errors.NewSentinelError("no pod found")
var ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
var ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
var ErrServiceNotFound = errors.NewSentinelError("service not found")
var (
ErrNoPodFound = errors.NewSentinelError("no pod found")
ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod")
ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service")
ErrServiceNotFound = errors.NewSentinelError("service not found")
)

func NewKubeFinder(ctx context.Context, mgr manager.Manager) (*KubeFinder, error) {
indexer := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())}
err := indexer.initIndexes(ctx)
finder := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())}
finder.initSeenIPsCache()
err := finder.initIndexes(ctx)
if err != nil {
return nil, errors.Wrap(err)
}
return indexer, nil
return finder, nil
}

func (k *KubeFinder) initSeenIPsCache() {
k.seenIPsTTLCache = expirable.NewLRU[string, struct{}](seenIPsCacheSize, nil, seenIPsCacheTTL)
}

func (k *KubeFinder) initIndexes(ctx context.Context) error {
Expand Down Expand Up @@ -80,6 +91,7 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error {
}

for _, ip := range pod.Status.PodIPs {
k.seenIPsTTLCache.Add(ip.IP, struct{}{})
res = append(res, ip.IP)
}
return res
Expand Down Expand Up @@ -464,7 +476,12 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc
}

func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) {
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted.
// Known issue: this function is currently missing support for services/endpoints, node.PodCIDR

wasPodIp := k.WasPodIP(ip)
if wasPodIp {
return true, nil
}

isNode, err := k.IsNodeIP(ctx, ip)
if err != nil {
Expand Down Expand Up @@ -502,6 +519,10 @@ func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) {
return len(pods.Items) > 0, nil
}

func (k *KubeFinder) WasPodIP(ip string) bool {
return k.seenIPsTTLCache.Contains(ip)
}

func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) {
var nodes corev1.NodeList
err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip})
Expand Down
37 changes: 37 additions & 0 deletions src/mapper/pkg/kubefinder/kubefinder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() {
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))
}

func (s *KubeFinderTestSuite) TestIsSrcIpClusterInternal() {
pod := s.AddPod("test-pod", "1.1.1.1", nil, nil)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

// Check with existing pod's ip
isInternal, err := s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().True(isInternal)

// Check with non-existing pod's ip
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "8.8.8.8")
s.Require().NoError(err)
s.Require().False(isInternal)

err = s.Mgr.GetClient().Delete(context.Background(), pod)
s.Require().NoError(err)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

// Check pod doesn't exist in the manager's cache
pod, err = s.kubeFinder.ResolveIPToPod(context.Background(), "1.1.1.1")
s.Require().Nil(pod)
s.Require().Error(err)

// Check isInternal with the deleted pod's ip
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().True(isInternal)

// Reset the cache
s.kubeFinder.initSeenIPsCache()

// Check isInternal with the deleted pod's ip after cache reset
isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1")
s.Require().NoError(err)
s.Require().False(isInternal)
}

func TestKubeFinderTestSuite(t *testing.T) {
suite.Run(t, new(KubeFinderTestSuite))
}
Loading