@@ -80,12 +80,27 @@ type L7Stats struct {
80
80
Latency prometheus.Histogram
81
81
}
82
82
83
+ type ListenDetails struct {
84
+ ClosedAt time.Time
85
+ NsIPs []netaddr.IP
86
+ }
87
+
88
+ type Process struct {
89
+ Pid uint32
90
+ StartedAt time.Time
91
+ NetNsId string
92
+ }
93
+
94
+ func (p * Process ) isHostNs () bool {
95
+ return p .NetNsId == hostNetNsId
96
+ }
97
+
83
98
type Container struct {
84
99
id ContainerID
85
100
cgroup * cgroup.Cgroup
86
101
metadata * ContainerMetadata
87
102
88
- pids map [uint32 ]time. Time // pid -> start time
103
+ processes map [uint32 ]* Process
89
104
90
105
startedAt time.Time
91
106
zombieAt time.Time
@@ -95,7 +110,7 @@ type Container struct {
95
110
delaysByPid map [uint32 ]Delays
96
111
delaysLock sync.Mutex
97
112
98
- listens map [netaddr.IPPort ]map [uint32 ]time. Time // listen addr -> pid -> close time
113
+ listens map [netaddr.IPPort ]map [uint32 ]* ListenDetails
99
114
100
115
connectsSuccessful map [AddrPair ]int64 // dst:actual_dst -> count
101
116
connectsFailed map [netaddr.IPPort ]int64 // dst -> count
@@ -109,11 +124,8 @@ type Container struct {
109
124
110
125
mounts map [string ]proc.MountInfo
111
126
112
- nsIPs []netaddr.IP
113
-
114
127
logParsers map [string ]* LogParser
115
128
116
- isHostNs bool
117
129
hostConntrack * Conntrack
118
130
nsConntrack * Conntrack
119
131
lbConntracks []* Conntrack
@@ -134,11 +146,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
134
146
cgroup : cg ,
135
147
metadata : md ,
136
148
137
- pids : map [uint32 ]time. Time {},
149
+ processes : map [uint32 ]* Process {},
138
150
139
151
delaysByPid : map [uint32 ]Delays {},
140
152
141
- listens : map [netaddr.IPPort ]map [uint32 ]time. Time {},
153
+ listens : map [netaddr.IPPort ]map [uint32 ]* ListenDetails {},
142
154
143
155
connectsSuccessful : map [AddrPair ]int64 {},
144
156
connectsFailed : map [netaddr.IPPort ]int64 {},
@@ -151,7 +163,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
151
163
152
164
logParsers : map [string ]* LogParser {},
153
165
154
- isHostNs : hostNetNsId == netNs .UniqueId (),
155
166
hostConntrack : hostConntrack ,
156
167
157
168
done : make (chan struct {}),
@@ -304,7 +315,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
304
315
305
316
appTypes := map [string ]struct {}{}
306
317
seenJvms := map [string ]bool {}
307
- for pid := range c .pids {
318
+ for pid := range c .processes {
308
319
cmdline := proc .GetCmdline (pid )
309
320
if len (cmdline ) == 0 {
310
321
continue
@@ -352,15 +363,21 @@ func (c *Container) onProcessStart(pid uint32) {
352
363
if err != nil {
353
364
return
354
365
}
366
+ ns , err := proc .GetNetNs (pid )
367
+ if err != nil {
368
+ return
369
+ }
370
+ defer ns .Close ()
355
371
c .zombieAt = time.Time {}
356
- c .pids [pid ] = stats .BeginTime
372
+ c .processes [pid ] = & Process {Pid : pid , StartedAt : stats .BeginTime , NetNsId : ns .UniqueId ()}
373
+
357
374
if c .startedAt .IsZero () {
358
375
c .startedAt = stats .BeginTime
359
376
} else {
360
377
min := stats .BeginTime
361
- for _ , t := range c .pids {
362
- if t .Before (min ) {
363
- min = t
378
+ for _ , p := range c .processes {
379
+ if p . StartedAt .Before (min ) {
380
+ min = p . StartedAt
364
381
}
365
382
}
366
383
if min .After (c .startedAt ) {
@@ -373,8 +390,8 @@ func (c *Container) onProcessStart(pid uint32) {
373
390
func (c * Container ) onProcessExit (pid uint32 , oomKill bool ) {
374
391
c .lock .Lock ()
375
392
defer c .lock .Unlock ()
376
- delete (c .pids , pid )
377
- if len (c .pids ) == 0 {
393
+ delete (c .processes , pid )
394
+ if len (c .processes ) == 0 {
378
395
c .zombieAt = time .Now ()
379
396
}
380
397
delete (c .delaysByPid , pid )
@@ -418,9 +435,10 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
418
435
defer c .lock .Unlock ()
419
436
}
420
437
if _ , ok := c .listens [addr ]; ! ok {
421
- c .listens [addr ] = map [uint32 ]time. Time {}
438
+ c .listens [addr ] = map [uint32 ]* ListenDetails {}
422
439
}
423
- c.listens [addr ][pid ] = time.Time {}
440
+ details := & ListenDetails {}
441
+ c.listens [addr ][pid ] = details
424
442
425
443
if addr .IP ().IsUnspecified () {
426
444
ns , err := proc .GetNetNs (pid )
@@ -434,7 +452,7 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
434
452
if ips , err := proc .GetNsIps (ns ); err != nil {
435
453
klog .Warningln (err )
436
454
} else {
437
- c . nsIPs = ips
455
+ details . NsIPs = ips
438
456
}
439
457
}
440
458
}
@@ -444,13 +462,19 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
444
462
defer c .lock .Unlock ()
445
463
if _ , byAddr := c .listens [addr ]; byAddr {
446
464
if _ , byPid := c.listens [addr ][pid ]; byPid {
447
- c.listens [addr ][pid ] = time .Now ()
465
+ if details := c.listens [addr ][pid ]; details != nil {
466
+ details .ClosedAt = time .Now ()
467
+ }
448
468
}
449
469
}
450
470
}
451
471
452
472
func (c * Container ) onConnectionOpen (pid uint32 , fd uint64 , src , dst netaddr.IPPort , timestamp uint64 , failed bool ) {
453
- if dst .IP ().IsLoopback () && ! c .isHostNs {
473
+ p := c .processes [pid ]
474
+ if p == nil {
475
+ return
476
+ }
477
+ if dst .IP ().IsLoopback () && ! p .isHostNs () {
454
478
return
455
479
}
456
480
whitelisted := false
@@ -468,7 +492,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
468
492
if failed {
469
493
c .connectsFailed [dst ]++
470
494
} else {
471
- actualDst , err := c .getActualDestination (pid , src , dst )
495
+ actualDst , err := c .getActualDestination (p , src , dst )
472
496
if err != nil {
473
497
if ! common .IsNotExist (err ) {
474
498
klog .Warningf ("cannot open NetNs for pid %d: %s" , pid , err )
@@ -478,7 +502,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
478
502
switch {
479
503
case actualDst == nil :
480
504
actualDst = & dst
481
- case actualDst .IP ().IsLoopback () && ! c .isHostNs :
505
+ case actualDst .IP ().IsLoopback () && ! p .isHostNs () :
482
506
return
483
507
}
484
508
c .connectsSuccessful [AddrPair {src : dst , dst : * actualDst }]++
@@ -493,7 +517,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
493
517
c .connectLastAttempt [dst ] = time .Now ()
494
518
}
495
519
496
- func (c * Container ) getActualDestination (pid uint32 , src , dst netaddr.IPPort ) (* netaddr.IPPort , error ) {
520
+ func (c * Container ) getActualDestination (p * Process , src , dst netaddr.IPPort ) (* netaddr.IPPort , error ) {
497
521
if actualDst := lookupCiliumConntrackTable (src , dst ); actualDst != nil {
498
522
return actualDst , nil
499
523
}
@@ -506,9 +530,9 @@ func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*
506
530
if actualDst != nil {
507
531
return actualDst , nil
508
532
}
509
- if ! c .isHostNs {
533
+ if ! p .isHostNs () {
510
534
if c .nsConntrack == nil {
511
- netNs , err := proc .GetNetNs (pid )
535
+ netNs , err := proc .GetNetNs (p . Pid )
512
536
if err != nil {
513
537
return nil , err
514
538
}
@@ -608,7 +632,7 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
608
632
func (c * Container ) updateDelays () {
609
633
c .delaysLock .Lock ()
610
634
defer c .delaysLock .Unlock ()
611
- for pid := range c .pids {
635
+ for pid := range c .processes {
612
636
stats , err := TaskstatsTGID (pid )
613
637
if err != nil {
614
638
continue
@@ -629,7 +653,7 @@ func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
629
653
res := map [string ]map [string ]* proc.FSStat {}
630
654
for _ , mi := range c .mounts {
631
655
var stat * proc.FSStat
632
- for pid := range c .pids {
656
+ for pid := range c .processes {
633
657
s , err := proc .StatFS (proc .Path (pid , "root" , mi .MountPoint ))
634
658
if err == nil {
635
659
stat = & s
@@ -651,20 +675,28 @@ func (c *Container) getListens() map[netaddr.IPPort]int {
651
675
res := map [netaddr.IPPort ]int {}
652
676
for addr , byPid := range c .listens {
653
677
open := 0
654
- for _ , closedAt := range byPid {
655
- if closedAt .IsZero () {
678
+ isHostNs := false
679
+ ips := map [netaddr.IP ]bool {}
680
+ for pid , details := range byPid {
681
+ p := c .processes [pid ]
682
+ if p == nil {
683
+ continue
684
+ }
685
+ if p .isHostNs () {
686
+ isHostNs = true
687
+ }
688
+ if details .ClosedAt .IsZero () {
656
689
open = 1
657
- break
690
+ }
691
+ for _ , ip := range details .NsIPs {
692
+ ips [ip ] = true
658
693
}
659
694
}
660
- var ips []netaddr.IP
661
- if addr .IP ().IsUnspecified () {
662
- ips = c .nsIPs
663
- } else {
664
- ips = []netaddr.IP {addr .IP ()}
695
+ if ! addr .IP ().IsUnspecified () {
696
+ ips = map [netaddr.IP ]bool {addr .IP (): true }
665
697
}
666
- for _ , ip := range ips {
667
- if ip .IsLoopback () && ! c . isHostNs {
698
+ for ip := range ips {
699
+ if ip .IsLoopback () && ! isHostNs {
668
700
continue
669
701
}
670
702
res [netaddr .IPPortFrom (ip , addr .Port ())] = open
@@ -723,7 +755,7 @@ func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
723
755
724
756
func (c * Container ) ping () map [netaddr.IP ]float64 {
725
757
netNs := netns .None ()
726
- for pid := range c .pids {
758
+ for pid := range c .processes {
727
759
if pid == agentPid {
728
760
netNs = selfNetNs
729
761
break
@@ -829,8 +861,12 @@ func (c *Container) gc(now time.Time) {
829
861
established := map [AddrPair ]struct {}{}
830
862
establishedDst := map [netaddr.IPPort ]struct {}{}
831
863
listens := map [netaddr.IPPort ]string {}
832
- for pid := range c .pids {
833
- sockets , err := proc .GetSockets (pid )
864
+ seenNamespaces := map [string ]bool {}
865
+ for _ , p := range c .processes {
866
+ if seenNamespaces [p .NetNsId ] {
867
+ continue
868
+ }
869
+ sockets , err := proc .GetSockets (p .Pid )
834
870
if err != nil {
835
871
continue
836
872
}
@@ -842,7 +878,7 @@ func (c *Container) gc(now time.Time) {
842
878
establishedDst [s .DAddr ] = struct {}{}
843
879
}
844
880
}
845
- break
881
+ seenNamespaces [ p . NetNsId ] = true
846
882
}
847
883
848
884
c .revalidateListens (now , listens )
@@ -888,9 +924,9 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
888
924
continue
889
925
}
890
926
klog .Warningln ("deleting the outdated listen:" , addr )
891
- for pid , closedAt := range byPid {
892
- if closedAt .IsZero () {
893
- byPid [ pid ] = now
927
+ for _ , details := range byPid {
928
+ if details . ClosedAt .IsZero () {
929
+ details . ClosedAt = now
894
930
}
895
931
}
896
932
}
@@ -903,8 +939,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
903
939
continue
904
940
}
905
941
open := false
906
- for _ , closedAt := range byPids {
907
- if closedAt .IsZero () {
942
+ for _ , details := range byPids {
943
+ if details . ClosedAt .IsZero () {
908
944
open = true
909
945
break
910
946
}
@@ -916,7 +952,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
916
952
917
953
if len (missingListens ) > 0 {
918
954
inodeToPid := map [string ]uint32 {}
919
- for pid := range c .pids {
955
+ for pid := range c .processes {
920
956
fds , err := proc .ReadFds (pid )
921
957
if err != nil {
922
958
continue
@@ -938,8 +974,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
938
974
}
939
975
940
976
for addr , pids := range c .listens {
941
- for pid , closedAt := range pids {
942
- if ! closedAt . IsZero () && now .Sub (closedAt ) > gcInterval {
977
+ for pid , details := range pids {
978
+ if ! details . ClosedAt . IsZero () && now .Sub (details . ClosedAt ) > gcInterval {
943
979
delete (c .listens [addr ], pid )
944
980
}
945
981
}
0 commit comments