From 13b4a22a23b161a310d3f1f215042e3a722b9901 Mon Sep 17 00:00:00 2001 From: Dave McCormick Date: Thu, 5 Sep 2024 14:39:05 +0100 Subject: [PATCH] Update kubernetai plugin to support transfers via the transfers plugin. Abstract the list of kubernetes plugins to an interface so that we can affectively mock them and control per plugin behaviour and responses. Signed-off-by: Dave McCormick --- plugin/kubernetai/axfr_test.go | 145 +++++++++++++++++++++++++++ plugin/kubernetai/kubernetai.go | 73 ++++++++++++-- plugin/kubernetai/kubernetai_test.go | 69 ++++++++++--- plugin/kubernetai/podhandler.go | 21 ---- plugin/kubernetai/setup.go | 11 +- plugin/kubernetai/setup_test.go | 8 +- 6 files changed, 273 insertions(+), 54 deletions(-) create mode 100644 plugin/kubernetai/axfr_test.go delete mode 100644 plugin/kubernetai/podhandler.go diff --git a/plugin/kubernetai/axfr_test.go b/plugin/kubernetai/axfr_test.go new file mode 100644 index 0000000..2daa864 --- /dev/null +++ b/plugin/kubernetai/axfr_test.go @@ -0,0 +1,145 @@ +package kubernetai + +import ( + "strings" + "testing" + + "github.com/coredns/coredns/plugin/transfer" + + "github.com/miekg/dns" +) + +func TestKubernetesTransferNonAuthZone(t *testing.T) { + type fields struct { + name string + kubernetes []*mockK8sPlugin + zone string + serial uint32 + expectedZone string + expectedError error + } + tests := []fields{ + { + name: "TestSingleKubernetesTransferNonAuthZone", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"cluster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + }, + zone: "example.com", + expectedError: transfer.ErrNotAuthoritative, + }, + { + name: "TestSingleKubernetesTransferAuthZone", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"cluster.local"}, + transfer: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + transferErr: nil, + }, + }, + zone: "cluster.local", + expectedZone: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + expectedError: nil, + }, + { + name: "TestMultipleNonAuthorititativeSingleAuthoritative", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"fluster.local"}, + transfer: ` +fluster.local. 5 IN SOA ns.dns.fluster.local. hostmaster.fluster.local. 3 7200 1800 86400 5 +fluster.local. 5 IN NS ns.dns.fluster.local. +ns.dns.fluster.local. 5 IN A 10.0.0.10 +fluster.local. 5 IN SOA ns.dns.fluster.local. hostmaster.fluster.local. 3 7200 1800 86400 5 +`, + transferErr: transfer.ErrNotAuthoritative, + }, + { + zones: []string{"bluster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + { + zones: []string{"cluster.local"}, + transfer: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + transferErr: nil, + }, + { + zones: []string{"muster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + }, + zone: "cluster.local", + expectedZone: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create kubernetai with mock kubernetes plugins + kai := Kubernetai{} + for _, plug := range tt.kubernetes { + kai.Kubernetes = append(kai.Kubernetes, plug) + } + + // create a axfr test message with test zone + dnsmsg := &dns.Msg{} + dnsmsg.SetAxfr(tt.zone) + + // perform AXFR + ch, err := kai.Transfer(tt.zone, tt.serial) + if err != nil { + if err != tt.expectedError { + t.Errorf("expected error %+v but received %+v", tt.expectedError, err) + } + return + } + validateAXFR(t, ch, tt.expectedZone) + }) + } +} + +func validateAXFR(t *testing.T, ch <-chan []dns.RR, expectedZone string) { + xfr := []dns.RR{} + for rrs := range ch { + xfr = append(xfr, rrs...) + } + if xfr[0].Header().Rrtype != dns.TypeSOA { + t.Error("Invalid transfer response, does not start with SOA record") + } + + zp := dns.NewZoneParser(strings.NewReader(expectedZone), "", "") + i := 0 + for rr, ok := zp.Next(); ok; rr, ok = zp.Next() { + if !dns.IsDuplicate(rr, xfr[i]) { + t.Fatalf("Record %d, expected\n%v\n, got\n%v", i, rr, xfr[i]) + } + i++ + } + + if err := zp.Err(); err != nil { + t.Fatal(err) + } +} diff --git a/plugin/kubernetai/kubernetai.go b/plugin/kubernetai/kubernetai.go index 61a34f4..144b101 100644 --- a/plugin/kubernetai/kubernetai.go +++ b/plugin/kubernetai/kubernetai.go @@ -1,3 +1,4 @@ +// Package kubernetai implements a plugin which can embed a number of kubernetes plugins in the same dns server. package kubernetai import ( @@ -5,29 +6,71 @@ import ( "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/kubernetes" + "github.com/coredns/coredns/plugin/kubernetes/object" clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/transfer" "github.com/coredns/coredns/request" "github.com/miekg/dns" ) var log = clog.NewWithPlugin("kubernetai") +// embeddedKubernetesPluginInterface describes the kubernetes plugin interface that kubernetai requires/uses. +type embeddedKubernetesPluginInterface interface { + plugin.Handler + transfer.Transferer + PodWithIP(ip string) (pod *object.Pod) + Zones() (zones plugin.Zones) +} + +// embeddedKubernetes wraps a real kubernetes plugin +type embeddedKubernetes struct { + *kubernetes.Kubernetes +} + +var _ embeddedKubernetesPluginInterface = &embeddedKubernetes{} + +func newEmbeddedKubernetes(k *kubernetes.Kubernetes) *embeddedKubernetes { + return &embeddedKubernetes{ + Kubernetes: k, + } +} + +// PodWithIP satisfies the embeddedKubernetesPluginInterface by adding this additional method not exported from the kubernetes plugin. +func (ek embeddedKubernetes) PodWithIP(ip string) *object.Pod { + if ek.Kubernetes == nil { + return nil + } + ps := ek.Kubernetes.APIConn.PodIndex(ip) + if len(ps) == 0 { + return nil + } + return ps[0] +} + +// Zones satisfies the embeddedKubernetesPluginInterface by providing access to the kubernetes plugin Zones. +func (ek embeddedKubernetes) Zones() plugin.Zones { + if ek.Kubernetes == nil { + return nil + } + return plugin.Zones(ek.Kubernetes.Zones) +} + // Kubernetai handles multiple Kubernetes type Kubernetai struct { Zones []string - Kubernetes []*kubernetes.Kubernetes + Kubernetes []embeddedKubernetesPluginInterface autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath. - p podHandlerItf } // New creates a Kubernetai containing one Kubernetes with zones func New(zones []string) (Kubernetai, *kubernetes.Kubernetes) { h := Kubernetai{ autoPathSearch: searchFromResolvConf(), - p: &podHandler{}, } k := kubernetes.New(zones) - h.Kubernetes = append(h.Kubernetes, k) + ek := newEmbeddedKubernetes(k) + h.Kubernetes = append(h.Kubernetes, ek) return h, k } @@ -43,7 +86,7 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { // Abort if zone is not in kubernetai stanza. var zMatch bool for _, k8s := range k8i.Kubernetes { - zone := plugin.Zones(k8s.Zones).Matches(state.Name()) + zone := k8s.Zones().Matches(state.Name()) if zone != "" { zMatch = true break @@ -55,13 +98,13 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { // Add autopath result for the handled zones for _, k := range k8i.Kubernetes { - pod := k8i.p.PodWithIP(*k, state.IP()) + pod := k.PodWithIP(state.IP()) if pod == nil { return nil } search := make([]string, 3) - for _, z := range k.Zones { + for _, z := range k.Zones() { if z == "." { search[0] = pod.Namespace + ".svc." search[1] = "svc." @@ -80,6 +123,20 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { return searchPath } +// Transfer supports the transfer plugin, implementing the Transferer interface, by calling Transfer on each of the embedded plugins. +// It will return a channel to the FIRST kubernetai stanza that reports that it is authoritative for the requested zone. +func (k8i Kubernetai) Transfer(zone string, serial uint32) (retCh <-chan []dns.RR, err error) { + for _, k := range k8i.Kubernetes { + retCh, err = k.Transfer(zone, serial) + if err == transfer.ErrNotAuthoritative { + continue + } + return + } + // none of the embedded plugins were authoritative + return nil, transfer.ErrNotAuthoritative +} + func searchFromResolvConf() []string { rc, err := dns.ClientConfigFromFile("/etc/resolv.conf") if err != nil { @@ -93,7 +150,7 @@ func searchFromResolvConf() []string { func (k8i Kubernetai) Health() bool { healthy := true for _, k := range k8i.Kubernetes { - healthy = healthy && k.APIConn.HasSynced() + healthy = healthy && k.(*embeddedKubernetes).APIConn.HasSynced() if !healthy { break } diff --git a/plugin/kubernetai/kubernetai_test.go b/plugin/kubernetai/kubernetai_test.go index f713336..72abd55 100644 --- a/plugin/kubernetai/kubernetai_test.go +++ b/plugin/kubernetai/kubernetai_test.go @@ -1,22 +1,33 @@ package kubernetai import ( + "context" "net" "reflect" + "strings" "testing" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/kubernetes" "github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/request" "github.com/miekg/dns" ) -type k8iPodHandlerTester struct{} +var ( + podip string +) + +// mockK8sPlugin satisfies the embeddedKubernetesPluginInterface interface and provides a mock kubernetes plugin that can be used to test kubernetai behaviour. +type mockK8sPlugin struct { + zones []string + transfer string + transferErr error +} -var podip string +var _ embeddedKubernetesPluginInterface = &mockK8sPlugin{} -func (k8i *k8iPodHandlerTester) PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod { +// PodWithIP always returns a pod with the given ip address in the namespace 'test-1'. +func (mkp *mockK8sPlugin) PodWithIP(ip string) *object.Pod { if ip == "" { return nil } @@ -27,7 +38,39 @@ func (k8i *k8iPodHandlerTester) PodWithIP(k kubernetes.Kubernetes, ip string) *o return pod } -var k8iPodHandlerTest k8iPodHandlerTester +// Name satisfies the plugin.Handler interface but is not used for tests. +func (mkp *mockK8sPlugin) Name() string { + return "" +} + +// ServeDNS satisfies the plugin.Handler interface but is not used for tests. +func (mkp *mockK8sPlugin) ServeDNS(_ context.Context, _ dns.ResponseWriter, _ *dns.Msg) (rcode int, err error) { + return 0, nil +} + +// Transfer satisfies the transfer.Transferer interface by playing back canned transfer responses. +// The canned transfer response is stored in a textual representation. +func (mkp *mockK8sPlugin) Transfer(_ string, _ uint32) (<-chan []dns.RR, error) { + if mkp.transferErr != nil { + return nil, mkp.transferErr + } + + ch := make(chan []dns.RR) + go func() { + zp := dns.NewZoneParser(strings.NewReader(mkp.transfer), "", "") + for rr, ok := zp.Next(); ok; rr, ok = zp.Next() { + ch <- []dns.RR{rr} + } + close(ch) + }() + + return ch, nil +} + +// Zones satisfies the embeddedKubernetesPluginInterface interface by returning pre-configured zones. +func (mkp *mockK8sPlugin) Zones() plugin.Zones { + return plugin.Zones(mkp.zones) +} type responseWriterTest struct { dns.ResponseWriter @@ -44,10 +87,8 @@ func (res *responseWriterTest) RemoteAddr() net.Addr { func TestKubernetai_AutoPath(t *testing.T) { type fields struct { Zones []string - Next plugin.Handler - Kubernetes []*kubernetes.Kubernetes + Kubernetes []embeddedKubernetesPluginInterface autoPathSearch []string - p *k8iPodHandlerTester } type args struct { state request.Request @@ -55,22 +96,21 @@ func TestKubernetai_AutoPath(t *testing.T) { w := &responseWriterTest{} - k8sClusterLocal := &kubernetes.Kubernetes{ - Zones: []string{ + k8sClusterLocal := &mockK8sPlugin{ + zones: []string{ "cluster.local.", }, } - k8sFlusterLocal := &kubernetes.Kubernetes{ - Zones: []string{ + k8sFlusterLocal := &mockK8sPlugin{ + zones: []string{ "fluster.local.", }, } defaultK8iConfig := fields{ - Kubernetes: []*kubernetes.Kubernetes{ + Kubernetes: []embeddedKubernetesPluginInterface{ k8sFlusterLocal, k8sClusterLocal, }, - p: &k8iPodHandlerTest, } tests := []struct { @@ -168,7 +208,6 @@ func TestKubernetai_AutoPath(t *testing.T) { Zones: tt.fields.Zones, Kubernetes: tt.fields.Kubernetes, autoPathSearch: tt.fields.autoPathSearch, - p: tt.fields.p, } podip = tt.ip if got := k8i.AutoPath(tt.args.state); !reflect.DeepEqual(got, tt.want) { diff --git a/plugin/kubernetai/podhandler.go b/plugin/kubernetai/podhandler.go deleted file mode 100644 index dcc060f..0000000 --- a/plugin/kubernetai/podhandler.go +++ /dev/null @@ -1,21 +0,0 @@ -package kubernetai - -import ( - "github.com/coredns/coredns/plugin/kubernetes" - "github.com/coredns/coredns/plugin/kubernetes/object" -) - -type podHandlerItf interface { - PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod -} - -type podHandler struct{} - -// podWithIP return the api.Pod for source IP ip. It returns nil if nothing can be found. -func (p *podHandler) PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod { - ps := k.APIConn.PodIndex(ip) - if len(ps) == 0 { - return nil - } - return ps[0] -} diff --git a/plugin/kubernetai/setup.go b/plugin/kubernetai/setup.go index c287754..5c82c9f 100644 --- a/plugin/kubernetai/setup.go +++ b/plugin/kubernetai/setup.go @@ -25,7 +25,7 @@ func setup(c *caddy.Controller) error { prev := &kubernetes.Kubernetes{} for _, k := range k8i.Kubernetes { - onStart, onShut, err := k.InitKubeCache(context.Background()) + onStart, onShut, err := k.(*embeddedKubernetes).InitKubeCache(context.Background()) if err != nil { return plugin.Error(Name(), err) } @@ -36,13 +36,13 @@ func setup(c *caddy.Controller) error { c.OnStartup(onStart) } // set Next of the previous kubernetes instance to the current instance - prev.Next = k - prev = k + prev.Next = k.(*embeddedKubernetes).Kubernetes + prev = k.(*embeddedKubernetes).Kubernetes } dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { // set Next of the last kubernetes instance to the next plugin - k8i.Kubernetes[len(k8i.Kubernetes)-1].Next = next + k8i.Kubernetes[len(k8i.Kubernetes)-1].(*embeddedKubernetes).Next = next return k8i }) @@ -53,7 +53,6 @@ func setup(c *caddy.Controller) error { func Parse(c *caddy.Controller) (*Kubernetai, error) { var k8i = &Kubernetai{ autoPathSearch: searchFromResolvConf(), - p: &podHandler{}, } for c.Next() { @@ -61,7 +60,7 @@ func Parse(c *caddy.Controller) (*Kubernetai, error) { if err != nil { return nil, err } - k8i.Kubernetes = append(k8i.Kubernetes, k8s) + k8i.Kubernetes = append(k8i.Kubernetes, newEmbeddedKubernetes(k8s)) } if len(k8i.Kubernetes) == 0 { diff --git a/plugin/kubernetai/setup_test.go b/plugin/kubernetai/setup_test.go index e0b3bdf..a0fa861 100644 --- a/plugin/kubernetai/setup_test.go +++ b/plugin/kubernetai/setup_test.go @@ -123,14 +123,14 @@ func TestSetup(t *testing.T) { } prev := &kubernetes.Kubernetes{ - Next: k8i.Kubernetes[0], + Next: k8i.Kubernetes[0].(*embeddedKubernetes).Kubernetes, } for j, k := range k8i.Kubernetes { - if prev.Next != k { - t.Fatalf("Test %d: Expected kubernetes instance %d to be referencing kubernetes instance %d as next, got %v", i, j-1, j, prev.Next) + if prev.Next != k.(*embeddedKubernetes).Kubernetes { + t.Fatalf("Test %d: Expected kubernetes instance %d to be referencing kubernetes instance %d as next, got %+v", i, j-1, j, prev.Next) } - prev = k + prev = k.(*embeddedKubernetes).Kubernetes } if prev.Next != nextHandler {