Skip to content

Commit 98b9e46

Browse files
authored
Merge pull request #18 from coroot/docker_swarm
Initial support for Docker Swarm
2 parents c8e0d2c + 67135f8 commit 98b9e46

File tree

3 files changed

+65
-0
lines changed

3 files changed

+65
-0
lines changed

containers/container.go

+25
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ var (
2828

2929
type ContainerID string
3030

31+
type ContainerNetwork struct {
32+
NetworkID string
33+
}
34+
3135
type ContainerMetadata struct {
3236
name string
3337
labels map[string]string
@@ -36,6 +40,7 @@ type ContainerMetadata struct {
3640
image string
3741
logDecoder logparser.Decoder
3842
hostListens map[string][]netaddr.IPPort
43+
networks map[string]ContainerNetwork
3944
}
4045

4146
type Delays struct {
@@ -111,6 +116,7 @@ type Container struct {
111116
isHostNs bool
112117
hostConntrack *Conntrack
113118
nsConntrack *Conntrack
119+
lbConntracks []*Conntrack
114120

115121
lock sync.RWMutex
116122

@@ -151,6 +157,17 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
151157
done: make(chan struct{}),
152158
}
153159

160+
for _, n := range md.networks {
161+
if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
162+
if ct, err := NewConntrack(nsHandle); err != nil {
163+
klog.Warningln(err)
164+
} else {
165+
c.lbConntracks = append(c.lbConntracks, ct)
166+
}
167+
_ = nsHandle.Close()
168+
}
169+
}
170+
154171
c.runLogParser("")
155172

156173
go func() {
@@ -173,6 +190,9 @@ func (c *Container) Close() {
173190
for _, p := range c.logParsers {
174191
p.Stop()
175192
}
193+
for _, ct := range c.lbConntracks {
194+
_ = ct.Close()
195+
}
176196
if c.nsConntrack != nil {
177197
_ = c.nsConntrack.Close()
178198
}
@@ -477,6 +497,11 @@ func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*
477497
if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
478498
return actualDst, nil
479499
}
500+
for _, lb := range c.lbConntracks {
501+
if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
502+
return actualDst, nil
503+
}
504+
}
480505
actualDst := c.hostConntrack.GetActualDestination(src, dst)
481506
if actualDst != nil {
482507
return actualDst, nil

containers/dockerd.go

+28
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ import (
77
"github.com/coroot/coroot-node-agent/proc"
88
"github.com/coroot/logparser"
99
"github.com/docker/docker/client"
10+
"github.com/vishvananda/netns"
1011
"inet.af/netaddr"
12+
"os"
13+
"path"
1114
"strings"
1215
"time"
1316
)
@@ -51,6 +54,7 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
5154
image: c.Config.Image,
5255
volumes: map[string]string{},
5356
hostListens: map[string][]netaddr.IPPort{},
57+
networks: map[string]ContainerNetwork{},
5458
}
5559
for _, m := range c.Mounts {
5660
res.volumes[m.Destination] = common.ParseKubernetesVolumeSource(m.Source)
@@ -78,6 +82,30 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
7882
}
7983
res.hostListens["dockerd"] = s
8084
}
85+
for name, network := range c.NetworkSettings.Networks {
86+
res.networks[name] = ContainerNetwork{
87+
NetworkID: network.NetworkID,
88+
}
89+
}
8190
}
8291
return res, nil
8392
}
93+
94+
func FindNetworkLoadBalancerNs(networkId string) netns.NsHandle {
95+
basePath := "/run/docker/netns"
96+
files, err := os.ReadDir(proc.HostPath(basePath))
97+
if err != nil {
98+
return -1
99+
}
100+
for _, f := range files {
101+
if !f.Type().IsRegular() || !strings.HasPrefix(f.Name(), "lb_") {
102+
continue
103+
}
104+
idPrefix := strings.Split(f.Name(), "_")[1]
105+
if strings.HasPrefix(networkId, idPrefix) {
106+
ns, _ := netns.GetFromPath(proc.HostPath(path.Join(basePath, f.Name())))
107+
return ns
108+
}
109+
}
110+
return -1
111+
}

containers/registry.go

+12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"k8s.io/klog/v2"
1414
"os"
1515
"regexp"
16+
"strings"
1617
"time"
1718
)
1819

@@ -325,6 +326,17 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
325326
}
326327
return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name))
327328
}
329+
if taskNameParts := strings.SplitN(md.labels["com.docker.swarm.task.name"], ".", 3); len(taskNameParts) == 3 {
330+
namespace := md.labels["com.docker.stack.namespace"]
331+
service := md.labels["com.docker.swarm.service.name"]
332+
if namespace != "" {
333+
service = strings.TrimPrefix(service, namespace+"_")
334+
}
335+
if namespace == "" {
336+
namespace = "_"
337+
}
338+
return ContainerID(fmt.Sprintf("/swarm/%s/%s/%s", namespace, service, taskNameParts[1]))
339+
}
328340
if md.name == "" { // should be "pure" dockerd container here
329341
klog.Warningln("empty dockerd container name for:", cg.ContainerId)
330342
return ""

0 commit comments

Comments
 (0)