From a6d6391cf1f5519b766008044ed2755aa18de919 Mon Sep 17 00:00:00 2001 From: Nikolay Sivko Date: Thu, 1 Jun 2023 11:30:55 +0300 Subject: [PATCH] add support for containers with multiple network namespaces --- containers/container.go | 134 +++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 49 deletions(-) diff --git a/containers/container.go b/containers/container.go index f9e091f..56c125b 100644 --- a/containers/container.go +++ b/containers/container.go @@ -80,12 +80,27 @@ type L7Stats struct { Latency prometheus.Histogram } +type ListenDetails struct { + ClosedAt time.Time + NsIPs []netaddr.IP +} + +type Process struct { + Pid uint32 + StartedAt time.Time + NetNsId string +} + +func (p *Process) isHostNs() bool { + return p.NetNsId == hostNetNsId +} + type Container struct { id ContainerID cgroup *cgroup.Cgroup metadata *ContainerMetadata - pids map[uint32]time.Time // pid -> start time + processes map[uint32]*Process startedAt time.Time zombieAt time.Time @@ -95,7 +110,7 @@ type Container struct { delaysByPid map[uint32]Delays delaysLock sync.Mutex - listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time + listens map[netaddr.IPPort]map[uint32]*ListenDetails connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count connectsFailed map[netaddr.IPPort]int64 // dst -> count @@ -109,11 +124,8 @@ type Container struct { mounts map[string]proc.MountInfo - nsIPs []netaddr.IP - logParsers map[string]*LogParser - isHostNs bool hostConntrack *Conntrack nsConntrack *Conntrack lbConntracks []*Conntrack @@ -134,11 +146,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host cgroup: cg, metadata: md, - pids: map[uint32]time.Time{}, + processes: map[uint32]*Process{}, delaysByPid: map[uint32]Delays{}, - listens: map[netaddr.IPPort]map[uint32]time.Time{}, + listens: map[netaddr.IPPort]map[uint32]*ListenDetails{}, connectsSuccessful: map[AddrPair]int64{}, connectsFailed: map[netaddr.IPPort]int64{}, @@ -151,7 +163,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host logParsers: map[string]*LogParser{}, - isHostNs: hostNetNsId == netNs.UniqueId(), hostConntrack: hostConntrack, done: make(chan struct{}), @@ -304,7 +315,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { appTypes := map[string]struct{}{} seenJvms := map[string]bool{} - for pid := range c.pids { + for pid := range c.processes { cmdline := proc.GetCmdline(pid) if len(cmdline) == 0 { continue @@ -352,15 +363,21 @@ func (c *Container) onProcessStart(pid uint32) { if err != nil { return } + ns, err := proc.GetNetNs(pid) + if err != nil { + return + } + defer ns.Close() c.zombieAt = time.Time{} - c.pids[pid] = stats.BeginTime + c.processes[pid] = &Process{Pid: pid, StartedAt: stats.BeginTime, NetNsId: ns.UniqueId()} + if c.startedAt.IsZero() { c.startedAt = stats.BeginTime } else { min := stats.BeginTime - for _, t := range c.pids { - if t.Before(min) { - min = t + for _, p := range c.processes { + if p.StartedAt.Before(min) { + min = p.StartedAt } } if min.After(c.startedAt) { @@ -373,8 +390,8 @@ func (c *Container) onProcessStart(pid uint32) { func (c *Container) onProcessExit(pid uint32, oomKill bool) { c.lock.Lock() defer c.lock.Unlock() - delete(c.pids, pid) - if len(c.pids) == 0 { + delete(c.processes, pid) + if len(c.processes) == 0 { c.zombieAt = time.Now() } delete(c.delaysByPid, pid) @@ -418,9 +435,10 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) { defer c.lock.Unlock() } if _, ok := c.listens[addr]; !ok { - c.listens[addr] = map[uint32]time.Time{} + c.listens[addr] = map[uint32]*ListenDetails{} } - c.listens[addr][pid] = time.Time{} + details := &ListenDetails{} + c.listens[addr][pid] = details if addr.IP().IsUnspecified() { ns, err := proc.GetNetNs(pid) @@ -434,7 +452,7 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) { if ips, err := proc.GetNsIps(ns); err != nil { klog.Warningln(err) } else { - c.nsIPs = ips + details.NsIPs = ips } } } @@ -444,13 +462,19 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) { defer c.lock.Unlock() if _, byAddr := c.listens[addr]; byAddr { if _, byPid := c.listens[addr][pid]; byPid { - c.listens[addr][pid] = time.Now() + if details := c.listens[addr][pid]; details != nil { + details.ClosedAt = time.Now() + } } } } func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) { - if dst.IP().IsLoopback() && !c.isHostNs { + p := c.processes[pid] + if p == nil { + return + } + if dst.IP().IsLoopback() && !p.isHostNs() { return } whitelisted := false @@ -468,7 +492,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP if failed { c.connectsFailed[dst]++ } else { - actualDst, err := c.getActualDestination(pid, src, dst) + actualDst, err := c.getActualDestination(p, src, dst) if err != nil { if !common.IsNotExist(err) { klog.Warningf("cannot open NetNs for pid %d: %s", pid, err) @@ -478,7 +502,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP switch { case actualDst == nil: actualDst = &dst - case actualDst.IP().IsLoopback() && !c.isHostNs: + case actualDst.IP().IsLoopback() && !p.isHostNs(): return } c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++ @@ -493,7 +517,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP c.connectLastAttempt[dst] = time.Now() } -func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*netaddr.IPPort, error) { +func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) { if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil { return actualDst, nil } @@ -506,9 +530,9 @@ func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (* if actualDst != nil { return actualDst, nil } - if !c.isHostNs { + if !p.isHostNs() { if c.nsConntrack == nil { - netNs, err := proc.GetNetNs(pid) + netNs, err := proc.GetNetNs(p.Pid) if err != nil { return nil, err } @@ -608,7 +632,7 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool { func (c *Container) updateDelays() { c.delaysLock.Lock() defer c.delaysLock.Unlock() - for pid := range c.pids { + for pid := range c.processes { stats, err := TaskstatsTGID(pid) if err != nil { continue @@ -629,7 +653,7 @@ func (c *Container) getMounts() map[string]map[string]*proc.FSStat { res := map[string]map[string]*proc.FSStat{} for _, mi := range c.mounts { var stat *proc.FSStat - for pid := range c.pids { + for pid := range c.processes { s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint)) if err == nil { stat = &s @@ -651,20 +675,28 @@ func (c *Container) getListens() map[netaddr.IPPort]int { res := map[netaddr.IPPort]int{} for addr, byPid := range c.listens { open := 0 - for _, closedAt := range byPid { - if closedAt.IsZero() { + isHostNs := false + ips := map[netaddr.IP]bool{} + for pid, details := range byPid { + p := c.processes[pid] + if p == nil { + continue + } + if p.isHostNs() { + isHostNs = true + } + if details.ClosedAt.IsZero() { open = 1 - break + } + for _, ip := range details.NsIPs { + ips[ip] = true } } - var ips []netaddr.IP - if addr.IP().IsUnspecified() { - ips = c.nsIPs - } else { - ips = []netaddr.IP{addr.IP()} + if !addr.IP().IsUnspecified() { + ips = map[netaddr.IP]bool{addr.IP(): true} } - for _, ip := range ips { - if ip.IsLoopback() && !c.isHostNs { + for ip := range ips { + if ip.IsLoopback() && !isHostNs { continue } res[netaddr.IPPortFrom(ip, addr.Port())] = open @@ -723,7 +755,7 @@ func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} { func (c *Container) ping() map[netaddr.IP]float64 { netNs := netns.None() - for pid := range c.pids { + for pid := range c.processes { if pid == agentPid { netNs = selfNetNs break @@ -829,8 +861,12 @@ func (c *Container) gc(now time.Time) { established := map[AddrPair]struct{}{} establishedDst := map[netaddr.IPPort]struct{}{} listens := map[netaddr.IPPort]string{} - for pid := range c.pids { - sockets, err := proc.GetSockets(pid) + seenNamespaces := map[string]bool{} + for _, p := range c.processes { + if seenNamespaces[p.NetNsId] { + continue + } + sockets, err := proc.GetSockets(p.Pid) if err != nil { continue } @@ -842,7 +878,7 @@ func (c *Container) gc(now time.Time) { establishedDst[s.DAddr] = struct{}{} } } - break + seenNamespaces[p.NetNsId] = true } c.revalidateListens(now, listens) @@ -888,9 +924,9 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I continue } klog.Warningln("deleting the outdated listen:", addr) - for pid, closedAt := range byPid { - if closedAt.IsZero() { - byPid[pid] = now + for _, details := range byPid { + if details.ClosedAt.IsZero() { + details.ClosedAt = now } } } @@ -903,8 +939,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I continue } open := false - for _, closedAt := range byPids { - if closedAt.IsZero() { + for _, details := range byPids { + if details.ClosedAt.IsZero() { open = true break } @@ -916,7 +952,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I if len(missingListens) > 0 { inodeToPid := map[string]uint32{} - for pid := range c.pids { + for pid := range c.processes { fds, err := proc.ReadFds(pid) if err != nil { continue @@ -938,8 +974,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I } for addr, pids := range c.listens { - for pid, closedAt := range pids { - if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval { + for pid, details := range pids { + if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval { delete(c.listens[addr], pid) } }