Skip to content

OpAMP WebSocket service #1969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jul 10, 2025

Conversation

LikeTheSalad
Copy link
Contributor

Description:

This is a continuation of the OpAMP implementation work. The changes here are focused on adding the RequestService's WebSocket implementation.

@LikeTheSalad LikeTheSalad requested a review from a team as a code owner June 20, 2025 14:08
@LikeTheSalad
Copy link
Contributor Author

cc @tigrannajaryan

Comment on lines 64 to 67
if (!getWebSocket().close(code, reason)) {
closing.set(false);
running.set(false);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say there is a failure at the same time when close is called. Then it is probably possible to reach a situation where the closing flag remains set to true after the close completes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I missed the "IllegalArgumentException" warning from the okhttp method. I've just updated it to avoid this issue.


/**
* Called when the websocket connection is successfully established with the remote peer and may
* start sending messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me that there is a word missing before may start sending messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rephrased it a bit, let me know what you think.

retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds);
}

webSocket.close(1000, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the call to close here trigger a onClosed event that will also call scheduleConnectionRetry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in theory. It can also happen in an onFailure event. That's why I made sure only one retry task can be scheduled, no matter how many times scheduleConnectionRetry is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are only tests with mocks it is hard to try this out. Looking at the code if onClosed gets called as a result of calling close here then the following could happen

  • scheduleConnectionRetry sets nextRetryScheduled to true and schedules retry
  • retryConnection runs and clears nextRetryScheduled and starts reconnecting
  • onClosed is called which calls scheduleConnectionRetry which ends up calling WebSocket.open again
    Firstly for this to be possible at all the delay used in first call to scheduleConnectionRetry would need to be really small. Secondly it would be fine since apparently calling WebSocket.open on an already open doesn't do anything. Just a bit confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. As you mentioned, it should be fine to call websocket.open multiple times, although it does seem confusing in the event that the delay time is really short, which I would hope would never be the case. Trying to think of ways to avoid this, probably we could store the "suggested delay" in memory and use it the next time scheduleConnectionRetry is called, since that should happen anyway after calling websocket.close here (even if the connection is interrupted with an error, because scheduleConnectionRetry would be called from onFailure in that case). It would bring other things to look out for though, as the suggested delay would become a state for this class, so it might make things confusing in a different way? - If you think it's worth it, I can look for ways to make it bullet-proof, but for now, I left it as is because it seemed unlikely that this scenario could happen in practice.

Comment on lines 106 to 108
if (!trySendRequest()) {
hasPendingRequest.set(true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is to handle when request is sent before websocket is ready. Imo there is race where trySendRequest returns false but onOpen completes before the hasPendingRequest flag is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it seems possible indeed. I've just updated it to make sure it won't happen.

@LikeTheSalad
Copy link
Contributor Author

There seems to be an issue with links from maven central on markdown files. I'm guessing it's temporary, so I'll ignore them on this PR.

Comment on lines 59 to 62
} catch (IllegalArgumentException e) {
status.set(Status.RUNNING);
throw e;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to keep it then there should be a comment explaining why this is needed. I think that we don't really need to handle this since using an invalid code or reason that is too long is a caller error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've added a comment.

}

@Override
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also fun onMessage(webSocket: WebSocket, text: String) is it fine to leave that unhandled? I guess it is since it isn't used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary format should be the only one we need, so yeah I think it's fine to leave the utf-8 message unhandled.

public static WebSocketRequestService create(
WebSocket webSocket, PeriodicDelay periodicRetryDelay) {
return new WebSocketRequestService(
webSocket, periodicRetryDelay, Executors.newSingleThreadScheduledExecutor());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use daemon thread here like in the http service

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I overlooked it. It's updated now.

@otelbot-java-contrib
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@Override
public void stop() {
if (hasStopped.compareAndSet(false, true)) {
doSendRequest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that sending the request is here because of https://opentelemetry.io/docs/specs/opamp/#websocket-transport-opamp-client-initiated The agent_disconnect flag must be set on the request by the caller. I think this could use a comment explaining why the request is sent before closing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've just added a comment to explain it.

public void stop() {
if (hasStopped.compareAndSet(false, true)) {
doSendRequest();
webSocket.close(1000, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use a a comment describing what 1000 is. Maybe link to https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1

1000 indicates a normal closure, meaning that the purpose for
which the connection was established has been fulfilled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've just updated it.

}
if (retryingConnection.compareAndSet(false, true)) {
periodicRetryDelay.reset();
if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are only ever using the retry suggested from the first retry attempt which is fine since subsequent failed attempts would use null for retry anyway. In http service there is the same code but there subsequent retry attempts could suggest the delay which we ignore. Just something I found a bit confusing.

@laurit
Copy link
Contributor

laurit commented Jul 8, 2025

Maybe we should also move the retry handling into a nested class like in the http service?

diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java
index 3450efa4..3010d00f 100644
--- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java
+++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/WebSocketRequestService.java
@@ -29,15 +29,14 @@ import opamp.proto.ServerErrorResponseType;
 import opamp.proto.ServerToAgent;
 
 public final class WebSocketRequestService implements RequestService, WebSocket.Listener {
+  private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
+      PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
+
   private final WebSocket webSocket;
-  private final PeriodicDelay periodicRetryDelay;
-  private final AtomicBoolean retryingConnection = new AtomicBoolean(false);
-  private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false);
   private final AtomicBoolean isRunning = new AtomicBoolean(false);
   private final AtomicBoolean hasStopped = new AtomicBoolean(false);
+  private final ConnectionStatus connectionStatus;
   private final ScheduledExecutorService executorService;
-  public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
-      PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
 
   @GuardedBy("hasPendingRequestLock")
   private boolean hasPendingRequest = false;
@@ -74,8 +73,8 @@ public final class WebSocketRequestService implements RequestService, WebSocket.
       PeriodicDelay periodicRetryDelay,
       ScheduledExecutorService executorService) {
     this.webSocket = webSocket;
-    this.periodicRetryDelay = periodicRetryDelay;
     this.executorService = executorService;
+    this.connectionStatus = new ConnectionStatus(periodicRetryDelay);
   }
 
   @Override
@@ -145,7 +144,7 @@ public final class WebSocketRequestService implements RequestService, WebSocket.
 
   @Override
   public void onOpen() {
-    retryingConnection.set(false);
+    connectionStatus.success();
     getCallback().onConnectionSuccess();
     synchronized (hasPendingRequestLock) {
       if (hasPendingRequest) {
@@ -191,7 +190,7 @@ public final class WebSocketRequestService implements RequestService, WebSocket.
       }
 
       webSocket.close(1000, null);
-      scheduleConnectionRetry(retryAfter);
+      connectionStatus.retryAfter(retryAfter);
     }
   }
 
@@ -199,28 +198,6 @@ public final class WebSocketRequestService implements RequestService, WebSocket.
     return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable);
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void scheduleConnectionRetry(@Nullable Duration retryAfter) {
-    if (hasStopped.get()) {
-      return;
-    }
-    if (retryingConnection.compareAndSet(false, true)) {
-      periodicRetryDelay.reset();
-      if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
-        ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter);
-      }
-    }
-    if (nextRetryScheduled.compareAndSet(false, true)) {
-      executorService.schedule(
-          this::retryConnection, periodicRetryDelay.getNextDelay().toNanos(), TimeUnit.NANOSECONDS);
-    }
-  }
-
-  private void retryConnection() {
-    nextRetryScheduled.set(false);
-    startConnection();
-  }
-
   @Override
   public void onClosing() {
     // Noop
@@ -229,17 +206,57 @@ public final class WebSocketRequestService implements RequestService, WebSocket.
   @Override
   public void onClosed() {
     // If this service isn't stopped, we should retry connecting.
-    scheduleConnectionRetry(null);
+    connectionStatus.retryAfter(null);
   }
 
   @Override
   public void onFailure(Throwable t) {
     getCallback().onConnectionFailed(t);
-    scheduleConnectionRetry(null);
+    connectionStatus.retryAfter(null);
   }
 
   @Nonnull
   private Callback getCallback() {
     return Objects.requireNonNull(callback);
   }
+
+  private class ConnectionStatus {
+    private final PeriodicDelay periodicRetryDelay;
+    private final AtomicBoolean retryingConnection = new AtomicBoolean(false);
+    private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false);
+
+    ConnectionStatus(PeriodicDelay periodicRetryDelay) {
+      this.periodicRetryDelay = periodicRetryDelay;
+    }
+
+    void success() {
+      retryingConnection.set(false);
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    void retryAfter(@Nullable Duration retryAfter) {
+      if (hasStopped.get()) {
+        return;
+      }
+
+      if (retryingConnection.compareAndSet(false, true)) {
+        periodicRetryDelay.reset();
+        if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
+          ((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter);
+        }
+      }
+
+      if (nextRetryScheduled.compareAndSet(false, true)) {
+        executorService.schedule(
+            this::retryConnection,
+            periodicRetryDelay.getNextDelay().toNanos(),
+            TimeUnit.NANOSECONDS);
+      }
+    }
+
+    private void retryConnection() {
+      nextRetryScheduled.set(false);
+      startConnection();
+    }
+  }
 }

@LikeTheSalad
Copy link
Contributor Author

Maybe we should also move the retry handling into a nested class like in the http service?

Sure, I'll add the changes.

@LikeTheSalad
Copy link
Contributor Author

Maybe we should also move the retry handling into a nested class like in the http service?

Sure, I'll add the changes.

Thanks, it's done now.

@laurit laurit added this pull request to the merge queue Jul 10, 2025
Merged via the queue into open-telemetry:main with commit 6f3a18a Jul 10, 2025
31 checks passed
@LikeTheSalad LikeTheSalad deleted the opamp-websocket-service branch July 11, 2025 13:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants