99 "sync"
1010
1111 "go.uber.org/zap"
12- v1 "k8s.io/api/core /v1"
12+ discoveryv1 "k8s.io/api/discovery /v1"
1313 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414 "k8s.io/apimachinery/pkg/runtime"
1515 "k8s.io/apimachinery/pkg/watch"
@@ -133,8 +133,7 @@ func newEpClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...
133133
134134 c .store = NewObjStore (transformFuncEndpoint , logger )
135135 lw := c .createEndpointListWatch (clientSet , metav1 .NamespaceAll )
136- //nolint:staticcheck // SA1019 TODO: resolve as part of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/43891
137- reflector := cache .NewReflector (lw , & v1.Endpoints {}, c .store , 0 )
136+ reflector := cache .NewReflector (lw , & discoveryv1.EndpointSlice {}, c .store , 0 )
138137
139138 go reflector .Run (c .stopChan )
140139
@@ -152,28 +151,34 @@ func (c *epClient) shutdown() {
152151}
153152
154153func transformFuncEndpoint (obj any ) (any , error ) {
155- //nolint:staticcheck // SA1019 TODO: resolve as part of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/43891
156- endpoint , ok := obj .(* v1.Endpoints )
154+ endpointSlice , ok := obj .(* discoveryv1.EndpointSlice )
157155 if ! ok {
158- return nil , fmt .Errorf ("input obj %v is not Endpoint type" , obj )
156+ return nil , fmt .Errorf ("input obj %v is not EndpointSlice type" , obj )
159157 }
160158 info := new (endpointInfo )
161- info .name = endpoint .Name
162- info .namespace = endpoint .Namespace
159+ // EndpointSlice uses a label to reference the service
160+ if serviceName , ok := endpointSlice .Labels [discoveryv1 .LabelServiceName ]; ok {
161+ info .name = serviceName
162+ } else {
163+ // Fallback to the EndpointSlice name if label is not present
164+ info .name = endpointSlice .Name
165+ }
166+ info .namespace = endpointSlice .Namespace
163167 info .podKeyList = []string {}
164- if subsets := endpoint . Subsets ; subsets != nil {
165- for _ , subset := range subsets {
166- if addresses := subset . Addresses ; addresses != nil {
167- for _ , address := range addresses {
168- if targetRef := address . TargetRef ; targetRef != nil && targetRef . Kind == typePod {
169- podKey := k8sutil . CreatePodKey ( targetRef . Namespace , targetRef . Name )
170- if podKey == "" {
171- continue
172- }
173- info . podKeyList = append ( info . podKeyList , podKey )
174- }
175- }
168+
169+ // EndpointSlice has Endpoints field (not Subsets like old Endpoints)
170+ for _ , endpoint := range endpointSlice . Endpoints {
171+ // Check if endpoint is ready
172+ if endpoint . Conditions . Ready != nil && ! * endpoint . Conditions . Ready {
173+ continue
174+ }
175+
176+ if endpoint . TargetRef != nil && endpoint . TargetRef . Kind == typePod {
177+ podKey := k8sutil . CreatePodKey ( endpoint . TargetRef . Namespace , endpoint . TargetRef . Name )
178+ if podKey == "" {
179+ continue
176180 }
181+ info .podKeyList = append (info .podKeyList , podKey )
177182 }
178183 }
179184 return info , nil
@@ -183,10 +188,10 @@ func (*epClient) createEndpointListWatch(client kubernetes.Interface, ns string)
183188 ctx := context .Background ()
184189 return & cache.ListWatch {
185190 ListFunc : func (opts metav1.ListOptions ) (runtime.Object , error ) {
186- return client .CoreV1 ().Endpoints (ns ).List (ctx , opts )
191+ return client .DiscoveryV1 ().EndpointSlices (ns ).List (ctx , opts )
187192 },
188193 WatchFunc : func (opts metav1.ListOptions ) (watch.Interface , error ) {
189- return client .CoreV1 ().Endpoints (ns ).Watch (ctx , opts )
194+ return client .DiscoveryV1 ().EndpointSlices (ns ).Watch (ctx , opts )
190195 },
191196 }
192197}
0 commit comments