From 30f1bf66ffb8252dedf042962e4b6b6ba4e27955 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Wed, 22 Jan 2025 15:39:00 -0500 Subject: [PATCH] Use Helidon gRPC client in Config Etcd module (#9674) * Switches from Netty to our gRPC client. * Updates problematic IT tests and constrains its scope to run only with V3 servers. V2 support is deprecated and marked for removal. --------- Signed-off-by: Santiago Pericas-Geertsen --- config/etcd/pom.xml | 25 ++------- .../etcd/internal/client/v2/EtcdV2Client.java | 4 +- .../etcd/internal/client/v3/EtcdV3Client.java | 33 ++++-------- config/etcd/src/main/java/module-info.java | 3 +- .../config/etcd/EtcdConfigSourceIT.java | 45 +++++++--------- .../config/etcd/client/EtcdClientIT.java | 54 +++++++------------ 6 files changed, 58 insertions(+), 106 deletions(-) diff --git a/config/etcd/pom.xml b/config/etcd/pom.xml index b9fc6b1fc09..e22ba00719d 100644 --- a/config/etcd/pom.xml +++ b/config/etcd/pom.xml @@ -49,7 +49,10 @@ io.helidon.common helidon-common-media-type - + + io.helidon.webclient + helidon-webclient-grpc + org.mousio @@ -60,11 +63,6 @@ io.grpc grpc-api - - io.grpc - grpc-netty - runtime - io.grpc grpc-protobuf @@ -136,21 +134,6 @@ - - org.apache.maven.plugins - maven-surefire-plugin - - - - - io.netty.eventLoopThreads - 2 - - - - org.codehaus.mojo build-helper-maven-plugin diff --git a/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v2/EtcdV2Client.java b/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v2/EtcdV2Client.java index a56f474cd09..674fe79defa 100644 --- a/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v2/EtcdV2Client.java +++ b/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v2/EtcdV2Client.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2024 Oracle and/or its affiliates. + * Copyright (c) 2017, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ public class EtcdV2Client implements EtcdClient { * * @deprecated */ - EtcdV2Client(URI... uris) { + public EtcdV2Client(URI... uris) { etcd = new mousio.etcd4j.EtcdClient(uris); etcd.setRetryHandler(new RetryWithTimeout(100, 2000)); } diff --git a/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v3/EtcdV3Client.java b/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v3/EtcdV3Client.java index 87ad55feeac..5b190a6e345 100644 --- a/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v3/EtcdV3Client.java +++ b/config/etcd/src/main/java/io/helidon/config/etcd/internal/client/v3/EtcdV3Client.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2024 Oracle and/or its affiliates. + * Copyright (c) 2017, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; -import java.util.concurrent.TimeUnit; +import io.helidon.common.tls.Tls; import io.helidon.config.etcd.internal.client.EtcdClient; import io.helidon.config.etcd.internal.client.EtcdClientException; import io.helidon.config.etcd.internal.client.proto.KVGrpc; @@ -34,10 +34,9 @@ import io.helidon.config.etcd.internal.client.proto.WatchGrpc; import io.helidon.config.etcd.internal.client.proto.WatchRequest; import io.helidon.config.etcd.internal.client.proto.WatchResponse; +import io.helidon.webclient.grpc.GrpcClient; import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -45,12 +44,10 @@ * Etcd API v3 client. */ public class EtcdV3Client implements EtcdClient { - - private static final System.Logger LOGGER = System.getLogger(EtcdV3Client.class.getName()); + private static final Tls DISABLE_TLS = Tls.builder().enabled(false).build(); private final Map> publishers = new ConcurrentHashMap<>(); - private final ManagedChannel channel; private final KVGrpc.KVBlockingStub kvStub; private final WatchGrpc.WatchStub watchStub; @@ -64,11 +61,12 @@ public EtcdV3Client(URI... uris) { throw new IllegalArgumentException("EtcdV3Client only supports a single URI"); } URI uri = uris[0]; - ManagedChannelBuilder mcb = ManagedChannelBuilder.forAddress(uri.getHost(), uri.getPort()); - this.channel = mcb.usePlaintext().build(); - - kvStub = KVGrpc.newBlockingStub(channel); - watchStub = WatchGrpc.newStub(channel); + GrpcClient grpcClient = GrpcClient.builder() + .baseUri(uri) + .tls(DISABLE_TLS) // must explicitly disable it + .build(); + kvStub = KVGrpc.newBlockingStub(grpcClient.channel()); + watchStub = WatchGrpc.newStub(grpcClient.channel()); } @Override @@ -140,16 +138,7 @@ public Flow.Publisher watch(String key) throws EtcdClientException { } @Override - public void close() throws EtcdClientException { + public void close() { publishers.values().forEach(SubmissionPublisher::close); - if (!channel.isShutdown() && !channel.isTerminated()) { - try { - channel.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.log(System.Logger.Level.INFO, "Error closing gRPC channel, reason: " + e.getLocalizedMessage(), e); - } finally { - channel.shutdown(); - } - } } } diff --git a/config/etcd/src/main/java/module-info.java b/config/etcd/src/main/java/module-info.java index 3bb68a6a73e..1e5da65b870 100644 --- a/config/etcd/src/main/java/module-info.java +++ b/config/etcd/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2024 Oracle and/or its affiliates. + * Copyright (c) 2017, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ requires io.grpc.stub; requires io.helidon.common.media.type; requires io.helidon.common; + requires io.helidon.webclient.grpc; requires static java.annotation; diff --git a/config/etcd/src/test/java/io/helidon/config/etcd/EtcdConfigSourceIT.java b/config/etcd/src/test/java/io/helidon/config/etcd/EtcdConfigSourceIT.java index 59c944136ea..ff9a4da0c70 100644 --- a/config/etcd/src/test/java/io/helidon/config/etcd/EtcdConfigSourceIT.java +++ b/config/etcd/src/test/java/io/helidon/config/etcd/EtcdConfigSourceIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2022 Oracle and/or its affiliates. + * Copyright (c) 2017, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,29 +29,27 @@ import io.helidon.config.etcd.internal.client.EtcdClient; import io.helidon.config.hocon.HoconConfigParser; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.Is.is; /** - * Tests {@link EtcdConfigSource} with both version, {@link EtcdApi#v2} and {@link EtcdApi#v3}. + * Integration test for {@link EtcdConfigSource} using {@link EtcdApi#v3}. */ public class EtcdConfigSourceIT { private static final URI DEFAULT_URI = URI.create("http://localhost:2379"); - @ParameterizedTest - @EnumSource(EtcdApi.class) - public void testConfig(EtcdApi version) throws Exception { - putConfiguration(version, "/application.conf"); + @Test + public void testConfig() throws Exception { + putConfiguration("/application.conf"); Config config = Config.builder() .sources(EtcdConfigSource.builder() .uri(DEFAULT_URI) .key("configuration") - .api(version) + .api(EtcdApi.v3) .mediaType(MediaTypes.APPLICATION_HOCON) .build()) .addParser(HoconConfigParser.create()) @@ -60,15 +58,14 @@ public void testConfig(EtcdApi version) throws Exception { assertThat(config.get("security").asNodeList().get(), hasSize(1)); } - @ParameterizedTest - @EnumSource(EtcdApi.class) - public void testConfigChanges(EtcdApi version) throws Exception { - putConfiguration(version, "/application.conf"); + @Test + public void testConfigChanges() throws Exception { + putConfiguration("/application.conf"); Config config = Config.builder() .sources(EtcdConfigSource.builder() .uri(DEFAULT_URI) .key("configuration") - .api(version) + .api(EtcdApi.v3) .mediaType(MediaTypes.APPLICATION_HOCON) .changeWatcher(EtcdWatcher.create()) .build()) @@ -77,27 +74,23 @@ public void testConfigChanges(EtcdApi version) throws Exception { assertThat(config.get("security").asNodeList().get(), hasSize(1)); - CountDownLatch initLatch = new CountDownLatch(1); CountDownLatch nextLatch = new CountDownLatch(3); + config.onChange(it -> nextLatch.countDown()); - config.onChange(it -> initLatch.countDown()); - - assertThat(initLatch.await(1, TimeUnit.SECONDS), is(true)); - - putConfiguration(version, "/application2.conf"); + putConfiguration("/application2.conf"); TimeUnit.MILLISECONDS.sleep(10); - putConfiguration(version, "/application3.conf"); + putConfiguration("/application3.conf"); TimeUnit.MILLISECONDS.sleep(10); - putConfiguration(version, "/application4.conf"); + putConfiguration("/application4.conf"); assertThat(nextLatch.await(20, TimeUnit.SECONDS), is(true)); } - private static void putConfiguration(EtcdApi version, String resourcePath) throws Exception { - EtcdClient etcd = version.clientFactory().createClient(DEFAULT_URI); - + private static void putConfiguration(String resourcePath) throws Exception { + EtcdClient etcd = EtcdApi.v3.clientFactory().createClient(DEFAULT_URI); File file = new File(EtcdConfigSourceIT.class.getResource(resourcePath).getFile()); - etcd.put("configuration", String.join("\n", Files.readAllLines(file.toPath(), Charset.defaultCharset()))); + etcd.put("configuration", String.join("\n", + Files.readAllLines(file.toPath(), Charset.defaultCharset()))); etcd.close(); } } diff --git a/config/etcd/src/test/java/io/helidon/config/etcd/client/EtcdClientIT.java b/config/etcd/src/test/java/io/helidon/config/etcd/client/EtcdClientIT.java index ac0433ea786..a716c9b8d03 100644 --- a/config/etcd/src/test/java/io/helidon/config/etcd/client/EtcdClientIT.java +++ b/config/etcd/src/test/java/io/helidon/config/etcd/client/EtcdClientIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2025 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,22 +18,17 @@ import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import io.helidon.config.etcd.internal.client.EtcdClient; import io.helidon.config.etcd.internal.client.EtcdClientException; -import io.helidon.config.etcd.internal.client.v2.EtcdV2Client; import io.helidon.config.etcd.internal.client.v3.EtcdV3Client; import org.hamcrest.core.Is; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -47,34 +42,26 @@ public class EtcdClientIT { private static final URI uri = URI.create("http://localhost:2379"); - private static Stream> clients() { - return List.of(EtcdV2Client.class, EtcdV3Client.class).stream(); - } - - @ParameterizedTest - @MethodSource("clients") - public void testPutGet(Class clientClass) throws EtcdClientException { - runTest(clientClass, etcdClient -> { + @Test + public void testPutGet() { + runTest(etcdClient -> { etcdClient.put("key", "value"); String result = etcdClient.get("key"); assertThat(result, is("value")); - }); } - - @ParameterizedTest - @MethodSource("clients") - public void testGetNonExistingKey(Class clientClass) throws EtcdClientException { - runTest(clientClass, etcdClient -> { + + @Test + public void testGetNonExistingKey() { + runTest(etcdClient -> { String result = etcdClient.get("non-existing-key"); assertThat(result, nullValue()); }); } - @ParameterizedTest - @MethodSource("clients") - public void testWatchNewKey(Class clientClass) throws EtcdClientException, ExecutionException, InterruptedException { - runTest(clientClass, etcdClient -> { + @Test + public void testWatchNewKey() { + runTest(etcdClient -> { final String key = "key#" + new Random().nextLong(); final String finalValue = "new value"; @@ -115,10 +102,9 @@ public void onComplete() { }); } - @ParameterizedTest - @MethodSource("clients") - public void testWatchValueChanges(Class clientClass) throws EtcdClientException, ExecutionException, InterruptedException { - runTest(clientClass, etcdClient -> { + @Test + public void testWatchValueChanges() { + runTest(etcdClient -> { final String key = "key"; etcdClient.put(key, "any value to change (just to be sure there is not already set to the final value"); @@ -166,14 +152,14 @@ public void onComplete() { */ @FunctionalInterface private interface EtcdClientConsumer { - public void accept(EtcdClient t) throws EtcdClientException, InterruptedException; + void accept(EtcdClient t) throws EtcdClientException, InterruptedException; } - private void runTest( - Class clientClass, - EtcdClientConsumer test) throws EtcdClientException { + private void runTest(EtcdClientConsumer test) { try { - try (EtcdClient etcdClient = clientClass.getDeclaredConstructor(URI.class).newInstance(uri)) { + URI[] uris = new URI[] {uri}; + try (EtcdClient etcdClient = EtcdV3Client.class.getDeclaredConstructor(URI[].class) + .newInstance(new Object[] {uris})) { test.accept(etcdClient); } catch (EtcdClientException ex) { fail(ex);