@@ -586,68 +586,87 @@ func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodC
586
586
return - 1 , nil
587
587
}
588
588
589
+ // nodesExternalTrafficPolicyTypeLocal filters nodes that have running pods belonging to the given NodePort service
590
+ // with externalTrafficPolicy=Local. Returns a prioritized slice of nodes, favoring those with ready, non-terminating pods.
591
+ func (sc * serviceSource ) nodesExternalTrafficPolicyTypeLocal (svc * v1.Service ) []* v1.Node {
592
+ var nodesReady []* v1.Node
593
+ var nodesRunning []* v1.Node
594
+ var nodes []* v1.Node
595
+ nodesMap := map [* v1.Node ]struct {}{}
596
+
597
+ pods := sc .pods (svc )
598
+
599
+ for _ , v := range pods {
600
+ if v .Status .Phase == v1 .PodRunning {
601
+ node , err := sc .nodeInformer .Lister ().Get (v .Spec .NodeName )
602
+ if err != nil {
603
+ log .Debugf ("Unable to find node where Pod %s is running" , v .Spec .Hostname )
604
+ continue
605
+ }
606
+
607
+ if _ , ok := nodesMap [node ]; ! ok {
608
+ nodesMap [node ] = * new (struct {})
609
+ nodesRunning = append (nodesRunning , node )
610
+
611
+ if isPodStatusReady (v .Status ) {
612
+ nodesReady = append (nodesReady , node )
613
+ // Check pod not terminating
614
+ if v .GetDeletionTimestamp () == nil {
615
+ nodes = append (nodes , node )
616
+ }
617
+ }
618
+ }
619
+ }
620
+ }
621
+
622
+ // Prioritize nodes with non-terminating ready pods
623
+ // If none available, fall back to nodes with ready pods
624
+ // If still none, use nodes with any running pods
625
+ if len (nodes ) > 0 {
626
+ // Works same as service endpoints
627
+ } else if len (nodesReady ) > 0 {
628
+ // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
629
+ // Publish all endpoints not always a bad thing
630
+ log .Debugf ("All pods in terminating state, use ready" )
631
+ nodes = nodesReady
632
+ } else {
633
+ log .Debugf ("All pods not ready, use all running" )
634
+ nodes = nodesRunning
635
+ }
636
+
637
+ return nodes
638
+ }
639
+
640
+ // pods retrieves a slice of pods associated with the given Service
641
+ func (sc * serviceSource ) pods (svc * v1.Service ) []* v1.Pod {
642
+ labelSelector , err := metav1 .ParseToLabelSelector (labels .Set (svc .Spec .Selector ).AsSelectorPreValidated ().String ())
643
+ if err != nil {
644
+ return nil
645
+ }
646
+ selector , err := metav1 .LabelSelectorAsSelector (labelSelector )
647
+ if err != nil {
648
+ return nil
649
+ }
650
+ pods , err := sc .podInformer .Lister ().Pods (svc .Namespace ).List (selector )
651
+ if err != nil {
652
+ return nil
653
+ }
654
+
655
+ return pods
656
+ }
657
+
589
658
func (sc * serviceSource ) extractNodePortTargets (svc * v1.Service ) (endpoint.Targets , error ) {
590
659
var (
591
660
internalIPs endpoint.Targets
592
661
externalIPs endpoint.Targets
593
662
ipv6IPs endpoint.Targets
594
663
nodes []* v1.Node
595
- err error
596
664
)
597
665
598
- switch svc .Spec .ExternalTrafficPolicy {
599
- case v1 .ServiceExternalTrafficPolicyTypeLocal :
600
- nodesMap := map [* v1.Node ]struct {}{}
601
- labelSelector , err := metav1 .ParseToLabelSelector (labels .Set (svc .Spec .Selector ).AsSelectorPreValidated ().String ())
602
- if err != nil {
603
- return nil , err
604
- }
605
- selector , err := metav1 .LabelSelectorAsSelector (labelSelector )
606
- if err != nil {
607
- return nil , err
608
- }
609
- pods , err := sc .podInformer .Lister ().Pods (svc .Namespace ).List (selector )
610
- if err != nil {
611
- return nil , err
612
- }
613
-
614
- var nodesReady []* v1.Node
615
- var nodesRunning []* v1.Node
616
- for _ , v := range pods {
617
- if v .Status .Phase == v1 .PodRunning {
618
- node , err := sc .nodeInformer .Lister ().Get (v .Spec .NodeName )
619
- if err != nil {
620
- log .Debugf ("Unable to find node where Pod %s is running" , v .Spec .Hostname )
621
- continue
622
- }
623
-
624
- if _ , ok := nodesMap [node ]; ! ok {
625
- nodesMap [node ] = * new (struct {})
626
- nodesRunning = append (nodesRunning , node )
627
-
628
- if isPodStatusReady (v .Status ) {
629
- nodesReady = append (nodesReady , node )
630
- // Check pod not terminating
631
- if v .GetDeletionTimestamp () == nil {
632
- nodes = append (nodes , node )
633
- }
634
- }
635
- }
636
- }
637
- }
638
-
639
- if len (nodes ) > 0 {
640
- // Works same as service endpoints
641
- } else if len (nodesReady ) > 0 {
642
- // 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
643
- // Publish all endpoints not always a bad thing
644
- log .Debugf ("All pods in terminating state, use ready" )
645
- nodes = nodesReady
646
- } else {
647
- log .Debugf ("All pods not ready, use all running" )
648
- nodes = nodesRunning
649
- }
650
- default :
666
+ if svc .Spec .ExternalTrafficPolicy == v1 .ServiceExternalTrafficPolicyTypeLocal {
667
+ nodes = sc .nodesExternalTrafficPolicyTypeLocal (svc )
668
+ } else {
669
+ var err error
651
670
nodes , err = sc .nodeInformer .Lister ().List (labels .Everything ())
652
671
if err != nil {
653
672
return nil , err
@@ -669,15 +688,17 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe
669
688
}
670
689
671
690
access := getAccessFromAnnotations (svc .Annotations )
672
- if access == "public" {
691
+ switch access {
692
+ case "public" :
673
693
return append (externalIPs , ipv6IPs ... ), nil
674
- }
675
- if access == "private" {
694
+ case "private" :
676
695
return internalIPs , nil
677
696
}
697
+
678
698
if len (externalIPs ) > 0 {
679
699
return append (externalIPs , ipv6IPs ... ), nil
680
700
}
701
+
681
702
return internalIPs , nil
682
703
}
683
704
0 commit comments