Skip to content

Commit a26d257

Browse files
LikeTheSaladlaurit
andauthored
OpAMP HTTP service (#1928)
Co-authored-by: Lauri Tulmin <[email protected]>
1 parent 3856e1d commit a26d257

File tree

16 files changed

+1256
-0
lines changed

16 files changed

+1256
-0
lines changed

opamp-client/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ description = "Client implementation of the OpAMP spec."
1212
otelJava.moduleName.set("io.opentelemetry.contrib.opamp.client")
1313

1414
dependencies {
15+
implementation("com.squareup.okhttp3:okhttp")
1516
annotationProcessor("com.google.auto.value:auto-value")
1617
compileOnly("com.google.auto.value:auto-value-annotations")
18+
testImplementation("org.mockito:mockito-inline")
1719
}
1820

1921
val opampReleaseInfo = tasks.register<Download>("opampLastReleaseInfo") {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.http;
7+
8+
public class HttpErrorException extends Exception {
9+
private final int errorCode;
10+
11+
private static final long serialVersionUID = 1L;
12+
13+
public int getErrorCode() {
14+
return errorCode;
15+
}
16+
17+
/**
18+
* Constructs an HTTP error related exception.
19+
*
20+
* @param errorCode The HTTP error code.
21+
* @param message The HTTP error message associated with the code.
22+
*/
23+
public HttpErrorException(int errorCode, String message) {
24+
super(message);
25+
this.errorCode = errorCode;
26+
}
27+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.http;
7+
8+
import java.io.Closeable;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.OutputStream;
12+
import java.util.concurrent.CompletableFuture;
13+
14+
public interface HttpSender {
15+
16+
CompletableFuture<Response> send(BodyWriter writer, int contentLength);
17+
18+
interface BodyWriter {
19+
void writeTo(OutputStream outputStream) throws IOException;
20+
}
21+
22+
interface Response extends Closeable {
23+
int statusCode();
24+
25+
String statusMessage();
26+
27+
InputStream bodyInputStream();
28+
29+
String getHeader(String name);
30+
}
31+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.http;
7+
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.util.concurrent.CompletableFuture;
11+
import okhttp3.Call;
12+
import okhttp3.Callback;
13+
import okhttp3.MediaType;
14+
import okhttp3.OkHttpClient;
15+
import okhttp3.RequestBody;
16+
import okio.BufferedSink;
17+
import org.jetbrains.annotations.NotNull;
18+
import org.jetbrains.annotations.Nullable;
19+
20+
public final class OkHttpSender implements HttpSender {
21+
private final OkHttpClient client;
22+
private final String url;
23+
24+
public static OkHttpSender create(String url) {
25+
return create(url, new OkHttpClient());
26+
}
27+
28+
public static OkHttpSender create(String url, OkHttpClient client) {
29+
return new OkHttpSender(url, client);
30+
}
31+
32+
private static final String CONTENT_TYPE = "application/x-protobuf";
33+
private static final MediaType MEDIA_TYPE = MediaType.parse(CONTENT_TYPE);
34+
35+
private OkHttpSender(String url, OkHttpClient client) {
36+
this.url = url;
37+
this.client = client;
38+
}
39+
40+
@Override
41+
public CompletableFuture<Response> send(BodyWriter writer, int contentLength) {
42+
CompletableFuture<Response> future = new CompletableFuture<>();
43+
okhttp3.Request.Builder builder = new okhttp3.Request.Builder().url(url);
44+
builder.addHeader("Content-Type", CONTENT_TYPE);
45+
46+
RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE);
47+
builder.post(body);
48+
49+
client
50+
.newCall(builder.build())
51+
.enqueue(
52+
new Callback() {
53+
@Override
54+
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) {
55+
if (response.isSuccessful() && response.body() != null) {
56+
future.complete(new OkHttpResponse(response));
57+
} else {
58+
future.completeExceptionally(
59+
new HttpErrorException(response.code(), response.message()));
60+
}
61+
}
62+
63+
@Override
64+
public void onFailure(@NotNull Call call, @NotNull IOException e) {
65+
future.completeExceptionally(e);
66+
}
67+
});
68+
69+
return future;
70+
}
71+
72+
private static class OkHttpResponse implements Response {
73+
private final okhttp3.Response response;
74+
75+
private OkHttpResponse(okhttp3.Response response) {
76+
if (response.body() == null) {
77+
throw new IllegalStateException("null response body not expected");
78+
}
79+
this.response = response;
80+
}
81+
82+
@Override
83+
public int statusCode() {
84+
return response.code();
85+
}
86+
87+
@Override
88+
public String statusMessage() {
89+
return response.message();
90+
}
91+
92+
@Override
93+
public InputStream bodyInputStream() {
94+
return response.body().byteStream();
95+
}
96+
97+
@Override
98+
public String getHeader(String name) {
99+
return response.headers().get(name);
100+
}
101+
102+
@Override
103+
public void close() {
104+
response.close();
105+
}
106+
}
107+
108+
private static class RawRequestBody extends RequestBody {
109+
private final BodyWriter writer;
110+
private final int contentLength;
111+
private final MediaType contentType;
112+
113+
private RawRequestBody(BodyWriter writer, int contentLength, MediaType contentType) {
114+
this.writer = writer;
115+
this.contentLength = contentLength;
116+
this.contentType = contentType;
117+
}
118+
119+
@Nullable
120+
@Override
121+
public MediaType contentType() {
122+
return contentType;
123+
}
124+
125+
@Override
126+
public long contentLength() {
127+
return contentLength;
128+
}
129+
130+
@Override
131+
public void writeTo(@NotNull BufferedSink bufferedSink) throws IOException {
132+
writer.writeTo(bufferedSink.outputStream());
133+
}
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.connectivity.http;
7+
8+
import io.opentelemetry.opamp.client.internal.tools.SystemTime;
9+
import java.time.Duration;
10+
import java.time.ZonedDateTime;
11+
import java.time.format.DateTimeFormatter;
12+
import java.util.Locale;
13+
import java.util.Optional;
14+
import java.util.regex.Pattern;
15+
16+
public final class RetryAfterParser {
17+
private final SystemTime systemTime;
18+
private static final Pattern SECONDS_PATTERN = Pattern.compile("\\d+");
19+
private static final Pattern DATE_PATTERN =
20+
Pattern.compile(
21+
"[A-Za-z]{3}, [0-3][0-9] [A-Za-z]{3} [0-9]{4} [0-2][0-9]:[0-5][0-9]:[0-5][0-9] GMT");
22+
private static final DateTimeFormatter DATE_FORMAT =
23+
DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
24+
25+
public static RetryAfterParser getInstance() {
26+
return new RetryAfterParser(SystemTime.getInstance());
27+
}
28+
29+
RetryAfterParser(SystemTime systemTime) {
30+
this.systemTime = systemTime;
31+
}
32+
33+
public Optional<Duration> tryParse(String value) {
34+
Duration duration = null;
35+
if (SECONDS_PATTERN.matcher(value).matches()) {
36+
duration = Duration.ofSeconds(Long.parseLong(value));
37+
} else if (DATE_PATTERN.matcher(value).matches()) {
38+
long difference = toMilliseconds(value) - systemTime.getCurrentTimeMillis();
39+
if (difference > 0) {
40+
duration = Duration.ofMillis(difference);
41+
}
42+
}
43+
return Optional.ofNullable(duration);
44+
}
45+
46+
private static long toMilliseconds(String value) {
47+
return ZonedDateTime.parse(value, DATE_FORMAT).toInstant().toEpochMilli();
48+
}
49+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.request;
7+
8+
import com.google.auto.value.AutoValue;
9+
import opamp.proto.AgentToServer;
10+
11+
/** Wrapper class for "AgentToServer" request body. */
12+
@AutoValue
13+
public abstract class Request {
14+
public abstract AgentToServer getAgentToServer();
15+
16+
public static Request create(AgentToServer agentToServer) {
17+
return new AutoValue_Request(agentToServer);
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.request.delay;
7+
8+
import java.time.Duration;
9+
10+
/**
11+
* A {@link PeriodicDelay} implementation that wants to accept delay time suggestions, as explained
12+
* <a
13+
* href="https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#throttling">here</a>,
14+
* must implement this interface.
15+
*/
16+
public interface AcceptsDelaySuggestion {
17+
void suggestDelay(Duration delay);
18+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.request.delay;
7+
8+
import java.time.Duration;
9+
10+
final class FixedPeriodicDelay implements PeriodicDelay {
11+
private final Duration duration;
12+
13+
public FixedPeriodicDelay(Duration duration) {
14+
this.duration = duration;
15+
}
16+
17+
@Override
18+
public Duration getNextDelay() {
19+
return duration;
20+
}
21+
22+
@Override
23+
public void reset() {}
24+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.opamp.client.internal.request.delay;
7+
8+
import java.time.Duration;
9+
10+
public interface PeriodicDelay {
11+
static PeriodicDelay ofFixedDuration(Duration duration) {
12+
return new FixedPeriodicDelay(duration);
13+
}
14+
15+
Duration getNextDelay();
16+
17+
void reset();
18+
}

0 commit comments

Comments
 (0)