Skip to content

Commit

Permalink
Merge pull request #20 from coroot/containers_with_multiple_net_names…
Browse files Browse the repository at this point in the history
…paces

add support for containers with multiple network namespaces
  • Loading branch information
def authored Jun 1, 2023
2 parents 0fb14e3 + a6d6391 commit 02a3aeb
Showing 1 changed file with 85 additions and 49 deletions.
134 changes: 85 additions & 49 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,27 @@ type L7Stats struct {
Latency prometheus.Histogram
}

type ListenDetails struct {
ClosedAt time.Time
NsIPs []netaddr.IP
}

type Process struct {
Pid uint32
StartedAt time.Time
NetNsId string
}

func (p *Process) isHostNs() bool {
return p.NetNsId == hostNetNsId
}

type Container struct {
id ContainerID
cgroup *cgroup.Cgroup
metadata *ContainerMetadata

pids map[uint32]time.Time // pid -> start time
processes map[uint32]*Process

startedAt time.Time
zombieAt time.Time
Expand All @@ -95,7 +110,7 @@ type Container struct {
delaysByPid map[uint32]Delays
delaysLock sync.Mutex

listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
listens map[netaddr.IPPort]map[uint32]*ListenDetails

connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
connectsFailed map[netaddr.IPPort]int64 // dst -> count
Expand All @@ -109,11 +124,8 @@ type Container struct {

mounts map[string]proc.MountInfo

nsIPs []netaddr.IP

logParsers map[string]*LogParser

isHostNs bool
hostConntrack *Conntrack
nsConntrack *Conntrack
lbConntracks []*Conntrack
Expand All @@ -134,11 +146,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
cgroup: cg,
metadata: md,

pids: map[uint32]time.Time{},
processes: map[uint32]*Process{},

delaysByPid: map[uint32]Delays{},

listens: map[netaddr.IPPort]map[uint32]time.Time{},
listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},

connectsSuccessful: map[AddrPair]int64{},
connectsFailed: map[netaddr.IPPort]int64{},
Expand All @@ -151,7 +163,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

logParsers: map[string]*LogParser{},

isHostNs: hostNetNsId == netNs.UniqueId(),
hostConntrack: hostConntrack,

done: make(chan struct{}),
Expand Down Expand Up @@ -304,7 +315,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {

appTypes := map[string]struct{}{}
seenJvms := map[string]bool{}
for pid := range c.pids {
for pid := range c.processes {
cmdline := proc.GetCmdline(pid)
if len(cmdline) == 0 {
continue
Expand Down Expand Up @@ -352,15 +363,21 @@ func (c *Container) onProcessStart(pid uint32) {
if err != nil {
return
}
ns, err := proc.GetNetNs(pid)
if err != nil {
return
}
defer ns.Close()
c.zombieAt = time.Time{}
c.pids[pid] = stats.BeginTime
c.processes[pid] = &Process{Pid: pid, StartedAt: stats.BeginTime, NetNsId: ns.UniqueId()}

if c.startedAt.IsZero() {
c.startedAt = stats.BeginTime
} else {
min := stats.BeginTime
for _, t := range c.pids {
if t.Before(min) {
min = t
for _, p := range c.processes {
if p.StartedAt.Before(min) {
min = p.StartedAt
}
}
if min.After(c.startedAt) {
Expand All @@ -373,8 +390,8 @@ func (c *Container) onProcessStart(pid uint32) {
func (c *Container) onProcessExit(pid uint32, oomKill bool) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.pids, pid)
if len(c.pids) == 0 {
delete(c.processes, pid)
if len(c.processes) == 0 {
c.zombieAt = time.Now()
}
delete(c.delaysByPid, pid)
Expand Down Expand Up @@ -418,9 +435,10 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
defer c.lock.Unlock()
}
if _, ok := c.listens[addr]; !ok {
c.listens[addr] = map[uint32]time.Time{}
c.listens[addr] = map[uint32]*ListenDetails{}
}
c.listens[addr][pid] = time.Time{}
details := &ListenDetails{}
c.listens[addr][pid] = details

if addr.IP().IsUnspecified() {
ns, err := proc.GetNetNs(pid)
Expand All @@ -434,7 +452,7 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
if ips, err := proc.GetNsIps(ns); err != nil {
klog.Warningln(err)
} else {
c.nsIPs = ips
details.NsIPs = ips
}
}
}
Expand All @@ -444,13 +462,19 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
defer c.lock.Unlock()
if _, byAddr := c.listens[addr]; byAddr {
if _, byPid := c.listens[addr][pid]; byPid {
c.listens[addr][pid] = time.Now()
if details := c.listens[addr][pid]; details != nil {
details.ClosedAt = time.Now()
}
}
}
}

func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
if dst.IP().IsLoopback() && !c.isHostNs {
p := c.processes[pid]
if p == nil {
return
}
if dst.IP().IsLoopback() && !p.isHostNs() {
return
}
whitelisted := false
Expand All @@ -468,7 +492,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if failed {
c.connectsFailed[dst]++
} else {
actualDst, err := c.getActualDestination(pid, src, dst)
actualDst, err := c.getActualDestination(p, src, dst)
if err != nil {
if !common.IsNotExist(err) {
klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
Expand All @@ -478,7 +502,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
switch {
case actualDst == nil:
actualDst = &dst
case actualDst.IP().IsLoopback() && !c.isHostNs:
case actualDst.IP().IsLoopback() && !p.isHostNs():
return
}
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
Expand All @@ -493,7 +517,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
c.connectLastAttempt[dst] = time.Now()
}

func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
return actualDst, nil
}
Expand All @@ -506,9 +530,9 @@ func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*
if actualDst != nil {
return actualDst, nil
}
if !c.isHostNs {
if !p.isHostNs() {
if c.nsConntrack == nil {
netNs, err := proc.GetNetNs(pid)
netNs, err := proc.GetNetNs(p.Pid)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -608,7 +632,7 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
func (c *Container) updateDelays() {
c.delaysLock.Lock()
defer c.delaysLock.Unlock()
for pid := range c.pids {
for pid := range c.processes {
stats, err := TaskstatsTGID(pid)
if err != nil {
continue
Expand All @@ -629,7 +653,7 @@ func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
res := map[string]map[string]*proc.FSStat{}
for _, mi := range c.mounts {
var stat *proc.FSStat
for pid := range c.pids {
for pid := range c.processes {
s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
if err == nil {
stat = &s
Expand All @@ -651,20 +675,28 @@ func (c *Container) getListens() map[netaddr.IPPort]int {
res := map[netaddr.IPPort]int{}
for addr, byPid := range c.listens {
open := 0
for _, closedAt := range byPid {
if closedAt.IsZero() {
isHostNs := false
ips := map[netaddr.IP]bool{}
for pid, details := range byPid {
p := c.processes[pid]
if p == nil {
continue
}
if p.isHostNs() {
isHostNs = true
}
if details.ClosedAt.IsZero() {
open = 1
break
}
for _, ip := range details.NsIPs {
ips[ip] = true
}
}
var ips []netaddr.IP
if addr.IP().IsUnspecified() {
ips = c.nsIPs
} else {
ips = []netaddr.IP{addr.IP()}
if !addr.IP().IsUnspecified() {
ips = map[netaddr.IP]bool{addr.IP(): true}
}
for _, ip := range ips {
if ip.IsLoopback() && !c.isHostNs {
for ip := range ips {
if ip.IsLoopback() && !isHostNs {
continue
}
res[netaddr.IPPortFrom(ip, addr.Port())] = open
Expand Down Expand Up @@ -723,7 +755,7 @@ func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {

func (c *Container) ping() map[netaddr.IP]float64 {
netNs := netns.None()
for pid := range c.pids {
for pid := range c.processes {
if pid == agentPid {
netNs = selfNetNs
break
Expand Down Expand Up @@ -829,8 +861,12 @@ func (c *Container) gc(now time.Time) {
established := map[AddrPair]struct{}{}
establishedDst := map[netaddr.IPPort]struct{}{}
listens := map[netaddr.IPPort]string{}
for pid := range c.pids {
sockets, err := proc.GetSockets(pid)
seenNamespaces := map[string]bool{}
for _, p := range c.processes {
if seenNamespaces[p.NetNsId] {
continue
}
sockets, err := proc.GetSockets(p.Pid)
if err != nil {
continue
}
Expand All @@ -842,7 +878,7 @@ func (c *Container) gc(now time.Time) {
establishedDst[s.DAddr] = struct{}{}
}
}
break
seenNamespaces[p.NetNsId] = true
}

c.revalidateListens(now, listens)
Expand Down Expand Up @@ -888,9 +924,9 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
continue
}
klog.Warningln("deleting the outdated listen:", addr)
for pid, closedAt := range byPid {
if closedAt.IsZero() {
byPid[pid] = now
for _, details := range byPid {
if details.ClosedAt.IsZero() {
details.ClosedAt = now
}
}
}
Expand All @@ -903,8 +939,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
continue
}
open := false
for _, closedAt := range byPids {
if closedAt.IsZero() {
for _, details := range byPids {
if details.ClosedAt.IsZero() {
open = true
break
}
Expand All @@ -916,7 +952,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I

if len(missingListens) > 0 {
inodeToPid := map[string]uint32{}
for pid := range c.pids {
for pid := range c.processes {
fds, err := proc.ReadFds(pid)
if err != nil {
continue
Expand All @@ -938,8 +974,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
}

for addr, pids := range c.listens {
for pid, closedAt := range pids {
if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval {
for pid, details := range pids {
if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
delete(c.listens[addr], pid)
}
}
Expand Down

0 comments on commit 02a3aeb

Please sign in to comment.