Skip to content

Problem with reconnecting to websocket in KubeServiceState #22

@MichalKoziorowski-TomTom

Description

Version

5.0.1

Context

Hi,

Inside https://github.com/eclipse-vertx/vertx-service-resolver/blob/5.0.2/src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java there is reconnecting implemented at lines:

Problem is that implementation is incompatible with that is kubernetes documentation: https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes

I see following problems:

  • lastResourceVersion is not updated after event is received from websocket
  • In case of longer problems with connecting to kubernetes API server, API server might already removed information about resource version that we keep in lastResourceVersion variable and respond with HTTP 410 Gone error. In this case, whole chain of operation that is initial GET with retrieving resource version + connect websocket should be performed again.

For debugging I've updated implementation with my version with couple of printlns:

/*
 * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
 */
package io.vertx.serviceresolver.kube.impl;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.endpoint.EndpointBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

class KubeServiceState<B> {

  final String name;
  final Vertx vertx;
  final KubeResolverImpl resolver;
  final EndpointBuilder<B, SocketAddress> endpointsBuilder;
  String lastResourceVersion;
  boolean disposed;
  WebSocket ws;
  AtomicReference<B> endpoints = new AtomicReference<>();

  KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) {
    this.endpointsBuilder = endpointsBuilder;
    this.name = name;
    this.resolver = resolver;
    this.vertx = vertx;
    this.lastResourceVersion = lastResourceVersion;
  }

  void connectWebSocket() {
    System.out.println("Entered connectWebSocket for service: " + name);
    String requestURI = "/api/v1/namespaces/" + resolver.namespace + "/endpoints?"
      + "watch=true"
      + "&"
      + "allowWatchBookmarks=true"
      + "&"
      + "resourceVersion=" + lastResourceVersion;
    WebSocketConnectOptions connectOptions = new WebSocketConnectOptions();
    connectOptions.setServer(resolver.server);
    connectOptions.setURI(requestURI);
    if (resolver.bearerToken != null) {
      connectOptions.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + resolver.bearerToken);
    }
    resolver.wsClient.webSocket()
      .handler(buff -> {
        JsonObject update  = buff.toJsonObject();
        handleUpdate(update);
      })
      .closeHandler(v -> {
        if (!disposed) {
          System.out.println(this + " !!!!! BUG Reconnecting websocket 1 !!!!!!");
          connectWebSocket();
        }
      }).connect(connectOptions).onComplete(ar -> {
        if (ar.succeeded()) {
          WebSocket ws = ar.result();
          if (disposed) {
            ws.close();
          } else {
            this.ws = ws;
          }
        } else {
          if (!disposed) {
            // Retry WebSocket connect
            vertx.setTimer(500, id -> {
              System.out.println(this + " !!!!! BUG Reconnecting websocket 2 !!!!!!");
              connectWebSocket();
            });
          }
        }
      });
  }

  void handleUpdate(JsonObject update) {
    System.out.println(this + " handleUpdate: " + update.encode());
    String type = update.getString("type");
    JsonObject object = update.getJsonObject("object");
    JsonObject metadata = object.getJsonObject("metadata");
    String resourceVersion = metadata.getString("resourceVersion");
    if (!lastResourceVersion.equals(resourceVersion)) {
      handleEndpoints(object);
    }
  }

  void handleEndpoints(JsonObject item) {
    System.out.println(this + " handleEndpoints: " + item.encode());
    JsonObject metadata = item.getJsonObject("metadata");
    String name = metadata.getString("name");
    if (this.name.equals(name)) {
      JsonArray subsets = item.getJsonArray("subsets");
      EndpointBuilder<B, SocketAddress> builder = endpointsBuilder;
      if (subsets != null) {
        for (int j = 0;j < subsets.size();j++) {
          List<String> podIps = new ArrayList<>();
          JsonObject subset = subsets.getJsonObject(j);
          JsonArray addresses = subset.getJsonArray("addresses");
          JsonArray ports = subset.getJsonArray("ports");
          for (int k = 0;k < addresses.size();k++) {
            JsonObject address = addresses.getJsonObject(k);
            String ip = address.getString("ip");
            podIps.add(ip);
          }
          for (int k = 0;k < ports.size();k++) {
            JsonObject port = ports.getJsonObject(k);
            int podPort = port.getInteger("port");
            for (String podIp : podIps) {
              SocketAddress podAddress = SocketAddress.inetSocketAddress(podPort, podIp);
              builder = builder.addServer(podAddress, podIp + "-" + podPort);
            }
          }
        }
      }
      this.endpoints.set(builder.build());
    }
  }
}

And after some time of scaling in and out my backend service app, I'm getting in logs:

Entered connectWebSocket for service: frontend-1
io.vertx.serviceresolver.kube.impl.KubeServiceState@9bbec14 handleUpdate: {"type":"ERROR","object":{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"too old resource version: 775570 (797044)","reason":"Expired","code":410}}
io.vertx.serviceresolver.kube.impl.KubeServiceState@9bbec14 handleEndpoints: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"too old resource version: 775570 (797044)","reason":"Expired","code":410}
io.vertx.serviceresolver.kube.impl.KubeServiceState@3f3e227e !!!!! BUG Reconnecting websocket 1 !!!!!!

After that, app stops getting updated list of ready pods from kubernetes, so after scaling down backends, I see connection exceptions to endpoints that are already shut down.

Steps to reproduce

No response

Do you have a reproducer?

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions