diff --git a/src/mapper/pkg/kubefinder/kubefinder.go b/src/mapper/pkg/kubefinder/kubefinder.go index 6b95c7e1..34e91556 100644 --- a/src/mapper/pkg/kubefinder/kubefinder.go +++ b/src/mapper/pkg/kubefinder/kubefinder.go @@ -3,6 +3,7 @@ package kubefinder import ( "context" "fmt" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/otterize/intents-operator/src/shared/errors" "github.com/otterize/intents-operator/src/shared/serviceidresolver" "github.com/otterize/network-mapper/src/mapper/pkg/config" @@ -31,26 +32,36 @@ const ( IstioCanonicalNameLabelKey = "service.istio.io/canonical-name" apiServerName = "kubernetes" apiServerNamespace = "default" + seenIPsCacheSize = 2000 + seenIPsCacheTTL = time.Minute * 10 ) type KubeFinder struct { mgr manager.Manager client client.Client serviceIdResolver *serviceidresolver.Resolver + seenIPsTTLCache *expirable.LRU[string, struct{}] } -var ErrNoPodFound = errors.NewSentinelError("no pod found") -var ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod") -var ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service") -var ErrServiceNotFound = errors.NewSentinelError("service not found") +var ( + ErrNoPodFound = errors.NewSentinelError("no pod found") + ErrFoundMoreThanOnePod = errors.NewSentinelError("ip belongs to more than one pod") + ErrFoundMoreThanOneService = errors.NewSentinelError("ip belongs to more than one service") + ErrServiceNotFound = errors.NewSentinelError("service not found") +) func NewKubeFinder(ctx context.Context, mgr manager.Manager) (*KubeFinder, error) { - indexer := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())} - err := indexer.initIndexes(ctx) + finder := &KubeFinder{client: mgr.GetClient(), mgr: mgr, serviceIdResolver: serviceidresolver.NewResolver(mgr.GetClient())} + finder.initSeenIPsCache() + err := finder.initIndexes(ctx) if err != nil { return nil, errors.Wrap(err) } - return indexer, nil + return finder, nil +} + +func (k *KubeFinder) initSeenIPsCache() { + k.seenIPsTTLCache = expirable.NewLRU[string, struct{}](seenIPsCacheSize, nil, seenIPsCacheTTL) } func (k *KubeFinder) initIndexes(ctx context.Context) error { @@ -80,6 +91,7 @@ func (k *KubeFinder) initIndexes(ctx context.Context) error { } for _, ip := range pod.Status.PodIPs { + k.seenIPsTTLCache.Add(ip.IP, struct{}{}) res = append(res, ip.IP) } return res @@ -464,7 +476,12 @@ func (k *KubeFinder) ResolveOtterizeIdentityForService(ctx context.Context, svc } func (k *KubeFinder) IsSrcIpClusterInternal(ctx context.Context, ip string) (bool, error) { - // Known issue: this function is currently missing support for services/endpoints, node.PodCIDR, and pods that were deleted. + // Known issue: this function is currently missing support for services/endpoints, node.PodCIDR + + wasPodIp := k.WasPodIP(ip) + if wasPodIp { + return true, nil + } isNode, err := k.IsNodeIP(ctx, ip) if err != nil { @@ -502,6 +519,10 @@ func (k *KubeFinder) IsPodIp(ctx context.Context, ip string) (bool, error) { return len(pods.Items) > 0, nil } +func (k *KubeFinder) WasPodIP(ip string) bool { + return k.seenIPsTTLCache.Contains(ip) +} + func (k *KubeFinder) IsNodeIP(ctx context.Context, ip string) (bool, error) { var nodes corev1.NodeList err := k.client.List(ctx, &nodes, client.MatchingFields{nodeIPIndexField: ip}) diff --git a/src/mapper/pkg/kubefinder/kubefinder_test.go b/src/mapper/pkg/kubefinder/kubefinder_test.go index 1b7709d4..5b9f5bde 100644 --- a/src/mapper/pkg/kubefinder/kubefinder_test.go +++ b/src/mapper/pkg/kubefinder/kubefinder_test.go @@ -75,6 +75,43 @@ func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() { s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP })) } +func (s *KubeFinderTestSuite) TestIsSrcIpClusterInternal() { + pod := s.AddPod("test-pod", "1.1.1.1", nil, nil) + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + // Check with existing pod's ip + isInternal, err := s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1") + s.Require().NoError(err) + s.Require().True(isInternal) + + // Check with non-existing pod's ip + isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "8.8.8.8") + s.Require().NoError(err) + s.Require().False(isInternal) + + err = s.Mgr.GetClient().Delete(context.Background(), pod) + s.Require().NoError(err) + s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background())) + + // Check pod doesn't exist in the manager's cache + pod, err = s.kubeFinder.ResolveIPToPod(context.Background(), "1.1.1.1") + s.Require().Nil(pod) + s.Require().Error(err) + + // Check isInternal with the deleted pod's ip + isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1") + s.Require().NoError(err) + s.Require().True(isInternal) + + // Reset the cache + s.kubeFinder.initSeenIPsCache() + + // Check isInternal with the deleted pod's ip after cache reset + isInternal, err = s.kubeFinder.IsSrcIpClusterInternal(context.Background(), "1.1.1.1") + s.Require().NoError(err) + s.Require().False(isInternal) +} + func TestKubeFinderTestSuite(t *testing.T) { suite.Run(t, new(KubeFinderTestSuite)) }