Skip to content

Commit

Permalink
Use Helidon gRPC client in Config Etcd module (#9674)
Browse files Browse the repository at this point in the history
* Switches from Netty to our gRPC client.
* Updates problematic IT tests and constrains its scope to run only with V3 servers. V2 support is deprecated and marked for removal.
---------
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas authored Jan 22, 2025
1 parent 643a742 commit 30f1bf6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 106 deletions.
25 changes: 4 additions & 21 deletions config/etcd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-media-type</artifactId>
</dependency>

<dependency>
<groupId>io.helidon.webclient</groupId>
<artifactId>helidon-webclient-grpc</artifactId>
</dependency>
<!-- etcd v2 -->
<dependency>
<groupId>org.mousio</groupId>
Expand All @@ -60,11 +63,6 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
Expand Down Expand Up @@ -136,21 +134,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<property>
<!-- on big machines (e.g. 52 cores hyperthreaded)
this failed with too many open files
-->
<name>io.netty.eventLoopThreads</name>
<value>2</value>
</property>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2024 Oracle and/or its affiliates.
* Copyright (c) 2017, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,7 @@ public class EtcdV2Client implements EtcdClient {
*
* @deprecated
*/
EtcdV2Client(URI... uris) {
public EtcdV2Client(URI... uris) {
etcd = new mousio.etcd4j.EtcdClient(uris);
etcd.setRetryHandler(new RetryWithTimeout(100, 2000));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2024 Oracle and/or its affiliates.
* Copyright (c) 2017, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

import io.helidon.common.tls.Tls;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.etcd.internal.client.proto.KVGrpc;
Expand All @@ -34,23 +34,20 @@
import io.helidon.config.etcd.internal.client.proto.WatchGrpc;
import io.helidon.config.etcd.internal.client.proto.WatchRequest;
import io.helidon.config.etcd.internal.client.proto.WatchResponse;
import io.helidon.webclient.grpc.GrpcClient;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* Etcd API v3 client.
*/
public class EtcdV3Client implements EtcdClient {

private static final System.Logger LOGGER = System.getLogger(EtcdV3Client.class.getName());
private static final Tls DISABLE_TLS = Tls.builder().enabled(false).build();

private final Map<String, SubmissionPublisher<Long>> publishers = new ConcurrentHashMap<>();

private final ManagedChannel channel;
private final KVGrpc.KVBlockingStub kvStub;
private final WatchGrpc.WatchStub watchStub;

Expand All @@ -64,11 +61,12 @@ public EtcdV3Client(URI... uris) {
throw new IllegalArgumentException("EtcdV3Client only supports a single URI");
}
URI uri = uris[0];
ManagedChannelBuilder mcb = ManagedChannelBuilder.forAddress(uri.getHost(), uri.getPort());
this.channel = mcb.usePlaintext().build();

kvStub = KVGrpc.newBlockingStub(channel);
watchStub = WatchGrpc.newStub(channel);
GrpcClient grpcClient = GrpcClient.builder()
.baseUri(uri)
.tls(DISABLE_TLS) // must explicitly disable it
.build();
kvStub = KVGrpc.newBlockingStub(grpcClient.channel());
watchStub = WatchGrpc.newStub(grpcClient.channel());
}

@Override
Expand Down Expand Up @@ -140,16 +138,7 @@ public Flow.Publisher<Long> watch(String key) throws EtcdClientException {
}

@Override
public void close() throws EtcdClientException {
public void close() {
publishers.values().forEach(SubmissionPublisher::close);
if (!channel.isShutdown() && !channel.isTerminated()) {
try {
channel.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.log(System.Logger.Level.INFO, "Error closing gRPC channel, reason: " + e.getLocalizedMessage(), e);
} finally {
channel.shutdown();
}
}
}
}
3 changes: 2 additions & 1 deletion config/etcd/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2024 Oracle and/or its affiliates.
* Copyright (c) 2017, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
requires io.grpc.stub;
requires io.helidon.common.media.type;
requires io.helidon.common;
requires io.helidon.webclient.grpc;

requires static java.annotation;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
* Copyright (c) 2017, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,29 +29,27 @@
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.hocon.HoconConfigParser;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;

/**
* Tests {@link EtcdConfigSource} with both version, {@link EtcdApi#v2} and {@link EtcdApi#v3}.
* Integration test for {@link EtcdConfigSource} using {@link EtcdApi#v3}.
*/
public class EtcdConfigSourceIT {

private static final URI DEFAULT_URI = URI.create("http://localhost:2379");

@ParameterizedTest
@EnumSource(EtcdApi.class)
public void testConfig(EtcdApi version) throws Exception {
putConfiguration(version, "/application.conf");
@Test
public void testConfig() throws Exception {
putConfiguration("/application.conf");
Config config = Config.builder()
.sources(EtcdConfigSource.builder()
.uri(DEFAULT_URI)
.key("configuration")
.api(version)
.api(EtcdApi.v3)
.mediaType(MediaTypes.APPLICATION_HOCON)
.build())
.addParser(HoconConfigParser.create())
Expand All @@ -60,15 +58,14 @@ public void testConfig(EtcdApi version) throws Exception {
assertThat(config.get("security").asNodeList().get(), hasSize(1));
}

@ParameterizedTest
@EnumSource(EtcdApi.class)
public void testConfigChanges(EtcdApi version) throws Exception {
putConfiguration(version, "/application.conf");
@Test
public void testConfigChanges() throws Exception {
putConfiguration("/application.conf");
Config config = Config.builder()
.sources(EtcdConfigSource.builder()
.uri(DEFAULT_URI)
.key("configuration")
.api(version)
.api(EtcdApi.v3)
.mediaType(MediaTypes.APPLICATION_HOCON)
.changeWatcher(EtcdWatcher.create())
.build())
Expand All @@ -77,27 +74,23 @@ public void testConfigChanges(EtcdApi version) throws Exception {

assertThat(config.get("security").asNodeList().get(), hasSize(1));

CountDownLatch initLatch = new CountDownLatch(1);
CountDownLatch nextLatch = new CountDownLatch(3);
config.onChange(it -> nextLatch.countDown());

config.onChange(it -> initLatch.countDown());

assertThat(initLatch.await(1, TimeUnit.SECONDS), is(true));

putConfiguration(version, "/application2.conf");
putConfiguration("/application2.conf");
TimeUnit.MILLISECONDS.sleep(10);
putConfiguration(version, "/application3.conf");
putConfiguration("/application3.conf");
TimeUnit.MILLISECONDS.sleep(10);
putConfiguration(version, "/application4.conf");
putConfiguration("/application4.conf");

assertThat(nextLatch.await(20, TimeUnit.SECONDS), is(true));
}

private static void putConfiguration(EtcdApi version, String resourcePath) throws Exception {
EtcdClient etcd = version.clientFactory().createClient(DEFAULT_URI);

private static void putConfiguration(String resourcePath) throws Exception {
EtcdClient etcd = EtcdApi.v3.clientFactory().createClient(DEFAULT_URI);
File file = new File(EtcdConfigSourceIT.class.getResource(resourcePath).getFile());
etcd.put("configuration", String.join("\n", Files.readAllLines(file.toPath(), Charset.defaultCharset())));
etcd.put("configuration", String.join("\n",
Files.readAllLines(file.toPath(), Charset.defaultCharset())));
etcd.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,22 +18,17 @@

import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.etcd.internal.client.v2.EtcdV2Client;
import io.helidon.config.etcd.internal.client.v3.EtcdV3Client;

import org.hamcrest.core.Is;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand All @@ -47,34 +42,26 @@ public class EtcdClientIT {

private static final URI uri = URI.create("http://localhost:2379");

private static Stream<Class<? extends EtcdClient>> clients() {
return List.of(EtcdV2Client.class, EtcdV3Client.class).stream();
}

@ParameterizedTest
@MethodSource("clients")
public <T extends EtcdClient> void testPutGet(Class<T> clientClass) throws EtcdClientException {
runTest(clientClass, etcdClient -> {
@Test
public void testPutGet() {
runTest(etcdClient -> {
etcdClient.put("key", "value");
String result = etcdClient.get("key");
assertThat(result, is("value"));

});
}

@ParameterizedTest
@MethodSource("clients")
public <T extends EtcdClient> void testGetNonExistingKey(Class<T> clientClass) throws EtcdClientException {
runTest(clientClass, etcdClient -> {

@Test
public void testGetNonExistingKey() {
runTest(etcdClient -> {
String result = etcdClient.get("non-existing-key");
assertThat(result, nullValue());
});
}

@ParameterizedTest
@MethodSource("clients")
public <T extends EtcdClient> void testWatchNewKey(Class<T> clientClass) throws EtcdClientException, ExecutionException, InterruptedException {
runTest(clientClass, etcdClient -> {
@Test
public void testWatchNewKey() {
runTest(etcdClient -> {
final String key = "key#" + new Random().nextLong();
final String finalValue = "new value";

Expand Down Expand Up @@ -115,10 +102,9 @@ public void onComplete() {
});
}

@ParameterizedTest
@MethodSource("clients")
public <T extends EtcdClient> void testWatchValueChanges(Class<T> clientClass) throws EtcdClientException, ExecutionException, InterruptedException {
runTest(clientClass, etcdClient -> {
@Test
public void testWatchValueChanges() {
runTest(etcdClient -> {
final String key = "key";

etcdClient.put(key, "any value to change (just to be sure there is not already set to the final value");
Expand Down Expand Up @@ -166,14 +152,14 @@ public void onComplete() {
*/
@FunctionalInterface
private interface EtcdClientConsumer {
public void accept(EtcdClient t) throws EtcdClientException, InterruptedException;
void accept(EtcdClient t) throws EtcdClientException, InterruptedException;
}

private <T extends EtcdClient> void runTest(
Class<T> clientClass,
EtcdClientConsumer test) throws EtcdClientException {
private <T extends EtcdClient> void runTest(EtcdClientConsumer test) {
try {
try (EtcdClient etcdClient = clientClass.getDeclaredConstructor(URI.class).newInstance(uri)) {
URI[] uris = new URI[] {uri};
try (EtcdClient etcdClient = EtcdV3Client.class.getDeclaredConstructor(URI[].class)
.newInstance(new Object[] {uris})) {
test.accept(etcdClient);
} catch (EtcdClientException ex) {
fail(ex);
Expand Down

0 comments on commit 30f1bf6

Please sign in to comment.