Skip to content

Commit

Permalink
Improve the way we resolve services DNS to identities by going direct…
Browse files Browse the repository at this point in the history
…ly from the `targetRef` of `endpoints` instead of going through IP addresses (#229)
  • Loading branch information
omris94 authored Jul 27, 2024
1 parent d63474e commit 710ea4b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 43 deletions.
19 changes: 15 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ permissions:
# pull-requests: read

jobs:
golangci:
name: golangci-lint
runs-on: ubuntu-20.04
vet:
# run vet in a separate job to avoid conflicts with golangci-lint pkg-cache
name: vet
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
with:
Expand All @@ -31,6 +32,16 @@ jobs:
working-directory: src/
- name: check git diff
run: git diff --exit-code
golangci:
name: golangci-lint
runs-on: ubuntu-20.04
steps:
- uses: actions/setup-go@v3
with:
go-version: '1.21.3'
- uses: actions/checkout@v3
- name: Install dependencies
run: sudo apt update && sudo apt install libpcap-dev # required for the linter to be able to lint github.com/google/gopacket
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand All @@ -41,7 +52,7 @@ jobs:
working-directory: src

# Optional: golangci-lint command line arguments.
args: --timeout 5m
args: --timeout 5m --out-format github-actions

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
Expand Down
26 changes: 16 additions & 10 deletions src/mapper/pkg/kubefinder/kubefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (k *KubeFinder) ResolveIstioWorkloadToPod(ctx context.Context, workload str
return &podList.Items[0], nil
}

func (k *KubeFinder) ResolveServiceAddressToIps(ctx context.Context, fqdn string) ([]string, types.NamespacedName, error) {
func (k *KubeFinder) ResolveServiceAddressToPods(ctx context.Context, fqdn string) ([]corev1.Pod, types.NamespacedName, error) {
clusterDomain := viper.GetString(config.ClusterDomainKey)
if !strings.HasSuffix(fqdn, clusterDomain) {
return nil, types.NamespacedName{}, errors.Errorf("address %s is not in the cluster", fqdn)
Expand All @@ -366,22 +366,28 @@ func (k *KubeFinder) ResolveServiceAddressToIps(ctx context.Context, fqdn string
}
namespace := fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-2]
serviceName := fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-3]
endpoints := &corev1.Endpoints{}
service := &corev1.Service{}
serviceNamespacedName := types.NamespacedName{Name: serviceName, Namespace: namespace}
err := k.client.Get(ctx, serviceNamespacedName, endpoints)
err := k.client.Get(ctx, serviceNamespacedName, service)
if err != nil {
return nil, types.NamespacedName{}, errors.Wrap(err)
}
ips := make([]string, 0)
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
ips = append(ips, address.IP)
}
pods, err := k.ResolveServiceToPods(ctx, service)
if err != nil {
return nil, types.NamespacedName{}, errors.Wrap(err)
}
return ips, serviceNamespacedName, nil

return pods, serviceNamespacedName, nil
case "pod":
// for address format of pods: 172-17-0-3.default.pod.cluster.local
return []string{strings.ReplaceAll(fqdnWithoutClusterDomainParts[0], "-", ".")}, types.NamespacedName{}, nil
ip := strings.ReplaceAll(fqdnWithoutClusterDomainParts[0], "-", ".")
pod, err := k.ResolveIPToPod(ctx, ip)
if err != nil {
return make([]corev1.Pod, 0), types.NamespacedName{}, errors.Wrap(err)
}

return []corev1.Pod{*pod}, types.NamespacedName{}, nil

default:
return nil, types.NamespacedName{}, errors.Errorf("cannot resolve k8s address %s, type %s not supported", fqdn, fqdnWithoutClusterDomainParts[len(fqdnWithoutClusterDomainParts)-1])
}
Expand Down
24 changes: 15 additions & 9 deletions src/mapper/pkg/kubefinder/kubefinder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/otterize/network-mapper/src/shared/testbase"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
"testing"
Expand Down Expand Up @@ -38,35 +39,40 @@ func (s *KubeFinderTestSuite) TestResolveIpToPod() {
}

func (s *KubeFinderTestSuite) TestResolveServiceAddressToIps() {
_, _, err := s.kubeFinder.ResolveServiceAddressToIps(context.Background(), "www.google.com")
_, _, err := s.kubeFinder.ResolveServiceAddressToPods(context.Background(), "www.google.com")
s.Require().Error(err)

_, _, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
_, _, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().Error(err)

podIp0 := "1.1.1.1"
podIp1 := "1.1.1.2"
podIp2 := "1.1.1.3"
s.Require().NoError(s.Mgr.GetClient().List(context.Background(), &corev1.EndpointsList{})) // Workaround: make then client start caching Endpoints, so when we do "WaitForCacheSync" it will actually sync cache"
s.AddDeploymentWithService("service0", []string{podIp0}, map[string]string{"app": "service0"}, "10.0.0.10")
s.AddDeploymentWithService("service1", []string{podIp1, podIp2}, map[string]string{"app": "service1"}, "10.0.0.11")
_, _, retPods := s.AddDeploymentWithService("service1", []string{podIp1, podIp2}, map[string]string{"app": "service1"}, "10.0.0.11")
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

ips, service, err := s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
pods, service, err := s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().NoError(err)
s.Require().Equal("svc-service1", service.Name)
s.Require().ElementsMatch(ips, []string{podIp1, podIp2})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(retPods, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))

// make sure we don't fail on the longer forms of k8s service addresses, listed on this page: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service
ips, service, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("4-4-4-4.svc-service1.%s.svc.cluster.local", s.TestNamespace))
pods, service, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.svc-service1.%s.svc.cluster.local", s.TestNamespace))
s.Require().Equal("svc-service1", service.Name)
s.Require().NoError(err)
s.Require().ElementsMatch(ips, []string{podIp1, podIp2})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(retPods, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))

ips, service, err = s.kubeFinder.ResolveServiceAddressToIps(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
_, _, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
s.Require().Error(err)
s.Require().True(s.Mgr.GetCache().WaitForCacheSync(context.Background()))

_, pods4444 := s.AddDeployment("depl", []string{"4.4.4.4"}, map[string]string{"app": "4444"})
pods, service, err = s.kubeFinder.ResolveServiceAddressToPods(context.Background(), fmt.Sprintf("4-4-4-4.%s.pod.cluster.local", s.TestNamespace))
s.Require().NoError(err)
s.Require().Empty(service)
s.Require().ElementsMatch(ips, []string{"4.4.4.4"})
s.Require().ElementsMatch(lo.Map(pods, func(p corev1.Pod, _ int) string { return p.Status.PodIP }), lo.Map(pods4444, func(p *corev1.Pod, _ int) string { return p.Status.PodIP }))
}

func TestKubeFinderTestSuite(t *testing.T) {
Expand Down
31 changes: 11 additions & 20 deletions src/mapper/pkg/resolvers/schema.helpers.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (r *Resolver) handleDNSCaptureResultsAsKubernetesPods(ctx context.Context,

func (r *Resolver) resolveOtterizeIdentityForDestinationAddress(ctx context.Context, dest model.Destination) (*model.OtterizeServiceIdentity, bool, error) {
destAddress := dest.Destination
ips, serviceName, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, destAddress)
pods, serviceName, err := r.kubeFinder.ResolveServiceAddressToPods(ctx, destAddress)
if err != nil {
logrus.WithError(err).Warningf("Could not resolve service address %s", destAddress)
// Intentionally no error return
Expand All @@ -282,30 +282,21 @@ func (r *Resolver) resolveOtterizeIdentityForDestinationAddress(ctx context.Cont
}, true, nil
}

if len(ips) == 0 {
logrus.Debugf("Service address %s is currently not backed by any pod, ignoring", destAddress)
return nil, false, nil
}
// Resolving the IP of the service's endpoints!
destPod, err := r.kubeFinder.ResolveIPToPod(ctx, ips[0])
if err != nil {
if errors.Is(err, kubefinder.ErrFoundMoreThanOnePod) {
logrus.WithError(err).Debugf("Ip %s belongs to more than one pod, ignoring", ips[0])
} else {
logrus.WithError(err).Debugf("Could not resolve %s to pod", ips[0])
filteredPods := lo.Filter(pods, func(pod corev1.Pod, _ int) bool {
lastCreationTimeForUsToTrustIt := dest.LastSeen
if lo.IsEmpty(serviceName) {
// In this case the DNS was a "pod" DNS - which contains IP - and therefore less reliable.
lastCreationTimeForUsToTrustIt = lastCreationTimeForUsToTrustIt.Add(viper.GetDuration(config.TimeServerHasToLiveBeforeWeTrustItKey))
}
return nil, false, nil
}
return lastCreationTimeForUsToTrustIt.After(pod.CreationTimestamp.Time) && pod.DeletionTimestamp == nil
})

if destPod.CreationTimestamp.After(dest.LastSeen) {
logrus.Debugf("Pod %s was created after capture time %s, ignoring", destPod.Name, dest.LastSeen)
if len(filteredPods) == 0 {
logrus.Debugf("Service address %s is currently not backed by any valid pod, ignoring", destAddress)
return nil, false, nil
}

if destPod.DeletionTimestamp != nil {
logrus.Debugf("Pod %s is being deleted, ignoring", destPod.Name)
return nil, false, nil
}
destPod := &filteredPods[0]

dstService, err := r.serviceIdResolver.ResolvePodToServiceIdentity(ctx, destPod)
if err != nil {
Expand Down

0 comments on commit 710ea4b

Please sign in to comment.