diff --git a/.github/workflows/test-unit.yml b/.github/workflows/test-unit.yml index 90c8693157..0b6c4c71ff 100644 --- a/.github/workflows/test-unit.yml +++ b/.github/workflows/test-unit.yml @@ -22,6 +22,14 @@ jobs: - name: Run Unit Test run: ./gradlew clean unitTest + + - name: Upload Reports + if: failure() + uses: actions/upload-artifact@v3 + with: + name: test-reports-java${{ matrix.java }}-${{ runner.os }} + path: java-client/build/reports/ + retention-days: 7 test-java8: runs-on: ${{ matrix.os }} @@ -48,3 +56,11 @@ jobs: - name: Run Unit Test run: ./gradlew clean unitTest -D"runtime.java=8" + + - name: Upload Reports + if: failure() + uses: actions/upload-artifact@v3 + with: + name: test-reports-java8-${{ runner.os }} + path: java-client/build/reports/ + retention-days: 7 \ No newline at end of file diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 83bfe938e8..bf809ce8ca 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -97,10 +97,12 @@ sourceSets { } tasks.withType { - expand( - "version" to version, - "git_revision" to (if (rootProject.extra.has("gitHashFull")) rootProject.extra["gitHashFull"] else "unknown") - ) + filesMatching("**/*.properties") { + expand( + "version" to version, + "git_revision" to (if (rootProject.extra.has("gitHashFull")) rootProject.extra["gitHashFull"] else "unknown") + ) + } } tasks.withType().configureEach{ @@ -137,6 +139,27 @@ tasks.build { dependsOn("spotlessJavaCheck") } +tasks.compileTestJava { + options.compilerArgs.add("-XDenableSunApiLintControl") + if (runtimeJavaVersion >= JavaVersion.VERSION_1_9) { + options.compilerArgs.addAll( + listOf( + "--add-exports=java.base/sun.security.util=ALL-UNNAMED", + "--add-exports=java.base/sun.security.x509=ALL-UNNAMED" + ) + ) + } +} + +tasks.withType { + if (runtimeJavaVersion >= JavaVersion.VERSION_1_9) { + jvmArgs( + "--add-exports=java.base/sun.security.util=ALL-UNNAMED", + "--add-exports=java.base/sun.security.x509=ALL-UNNAMED" + ) + } +} + tasks.test { systemProperty("tests.security.manager", "false") @@ -366,9 +389,6 @@ if (runtimeJavaVersion >= JavaVersion.VERSION_11) { testImplementation("org.opensearch.test", "framework", opensearchVersion) { exclude(group = "org.hamcrest") } - - // Apache 2.0 - testImplementation("org.wiremock", "wiremock", "3.9.2") } tasks.named("compileJava11Java") { @@ -381,12 +401,7 @@ if (runtimeJavaVersion >= JavaVersion.VERSION_11) { sourceCompatibility = JavaVersion.VERSION_11.toString() } - tasks.named("integrationTest") { - testClassesDirs += java11.output.classesDirs - classpath = sourceSets["java11"].runtimeClasspath - } - - tasks.named("unitTest") { + tasks.withType { testClassesDirs += java11.output.classesDirs classpath = sourceSets["java11"].runtimeClasspath } diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/core/pit/DeletePitRequest.java b/java-client/src/main/java/org/opensearch/client/opensearch/core/pit/DeletePitRequest.java index 6655628740..0f9f2b6c3d 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/core/pit/DeletePitRequest.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/core/pit/DeletePitRequest.java @@ -11,7 +11,7 @@ import jakarta.json.stream.JsonGenerator; import java.util.List; import java.util.function.Function; -import javax.annotation.Nullable; +import javax.annotation.Nonnull; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.PlainJsonSerializable; import org.opensearch.client.opensearch._types.ErrorResponse; @@ -28,11 +28,11 @@ */ public class DeletePitRequest extends RequestBase implements PlainJsonSerializable { - @Nullable - private List pitId; + @Nonnull + private final List pitId; public DeletePitRequest(Builder builder) { - this.pitId = builder.pitId; + this.pitId = ApiTypeHelper.unmodifiable(builder.pitId); } public static DeletePitRequest of(Function> fn) { @@ -44,7 +44,7 @@ public static DeletePitRequest of(Function * API name - {@code pit_id} */ - @Nullable + @Nonnull public final List pitId() { return this.pitId; } @@ -78,13 +78,18 @@ protected void serializeInternal(JsonGenerator generator, JsonpMapper mapper) { public static class Builder extends ObjectBuilderBase implements ObjectBuilder { private List pitId; + public final Builder pitId(@Nonnull String value, @Nonnull String... values) { + this.pitId = _listAdd(this.pitId, value, values); + return this; + } + /** * A list of Pit IDs to be deleted *

* API name - {@code pit_id} */ - public final Builder pitId(@Nullable List pitId) { - this.pitId = pitId; + public final Builder pitId(@Nonnull List pitId) { + this.pitId = _listAddAll(this.pitId, pitId); return this; } diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 83e27297ce..94e0197e0c 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -87,7 +87,6 @@ public class AwsSdk2Transport implements OpenSearchTransport { private final String signingServiceName; private final Region signingRegion; private final JsonpMapper defaultMapper; - @Nonnull private final AwsSdk2TransportOptions transportOptions; /** diff --git a/java-client/src/test/java11/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java b/java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java similarity index 51% rename from java-client/src/test/java11/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java rename to java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java index cc19d77b7d..469d383aa6 100644 --- a/java-client/src/test/java11/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java +++ b/java-client/src/test/java/org/opensearch/client/transport/aws/AwsSdk2TransportTests.java @@ -8,39 +8,50 @@ package org.opensearch.client.transport.aws; -import static com.github.tomakehurst.wiremock.client.WireMock.any; -import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; -import static com.github.tomakehurst.wiremock.client.WireMock.delete; -import static com.github.tomakehurst.wiremock.client.WireMock.okJson; -import static com.github.tomakehurst.wiremock.client.WireMock.put; -import static com.github.tomakehurst.wiremock.client.WireMock.serviceUnavailable; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathTemplate; -import static com.github.tomakehurst.wiremock.common.ContentTypes.APPLICATION_JSON; -import static com.github.tomakehurst.wiremock.common.ContentTypes.CONTENT_LENGTH; -import static com.github.tomakehurst.wiremock.common.ContentTypes.CONTENT_TYPE; -import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.apache.hc.core5.http.ContentType.APPLICATION_JSON; +import static org.apache.hc.core5.http.HttpHeaders.CONTENT_LENGTH; +import static org.apache.hc.core5.http.HttpHeaders.CONTENT_TYPE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import com.github.tomakehurst.wiremock.http.RequestMethod; -import com.github.tomakehurst.wiremock.junit.WireMockRule; -import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import java.net.InetSocketAddress; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; import java.util.Arrays; import java.util.Collection; -import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; +import javax.net.ssl.SSLContext; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.BasicEntityDetails; +import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler; +import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.transport.util.FunnellingHttpsProxy; +import org.opensearch.client.transport.util.GeneratedCertificateSSLContext; +import reactor.core.publisher.Flux; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpConfigurationOption; @@ -49,15 +60,28 @@ import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.http.crt.AwsCrtHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.AttributeMap; @RunWith(Parameterized.class) public class AwsSdk2TransportTests { private static final Region TEST_REGION = Region.AP_SOUTHEAST_2; + private static final String TEST_INDEX = "sample-index1"; + private static final SSLContext SSL_CONTEXT; + + static { + try { + SSL_CONTEXT = GeneratedCertificateSSLContext.generate(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } - @Rule - public final WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort().enableBrowserProxying(true)); + private HttpAsyncServer server; + private FunnellingHttpsProxy proxy; + + private final ConcurrentLinkedQueue receivedRequests = new ConcurrentLinkedQueue<>(); private final SdkHttpClientType sdkHttpClientType; private final String serviceName; private final String serviceHostName; @@ -70,9 +94,9 @@ public AwsSdk2TransportTests(SdkHttpClientType sdkHttpClientType, String service @Parameterized.Parameters(name = "sdkHttpClientType: {0}, serviceName: {1}") public static Collection getParameters() { - var serviceNames = List.of("aoss", "es", "arbitrary"); + String[] serviceNames = new String[] { "aoss", "es", "arbitrary" }; return Arrays.stream(SdkHttpClientType.values()) - .flatMap(sdkHttpClientType -> serviceNames.stream().map(serviceName -> new Object[] { sdkHttpClientType, serviceName })) + .flatMap(sdkHttpClientType -> Arrays.stream(serviceNames).map(serviceName -> new Object[] { sdkHttpClientType, serviceName })) .collect(Collectors.toList()); } @@ -84,27 +108,67 @@ public enum SdkHttpClientType { } @Before - public void setup() { - stubFor(any(anyUrl()).atPriority(10).willReturn(serviceUnavailable())); - - stubFor( - put(urlPathTemplate("/{index}")).atPriority(1) - .willReturn( - okJson("{\"acknowledged\": true,\"shards_acknowledged\": true,\"index\": \"{{request.path.index}}\"}").withTransformers( - "response-template" + public void setup() throws Exception { + server = AsyncServerBootstrap.bootstrap() + .setRequestRouter( + RequestRouter.>builder() + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/" + TEST_INDEX, + hardcodedJsonHandler( + "PUT", + "{\"acknowledged\": true,\"shards_acknowledged\": true,\"index\": \"" + TEST_INDEX + "\"}" + ) ) - ) - ); - - stubFor(delete(urlPathEqualTo("/_search/scroll")).atPriority(1).willReturn(okJson("{\"succeeded\": true,\"num_freed\": 1}"))); + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/_search/scroll", + hardcodedJsonHandler("DELETE", "{\"succeeded\": true,\"num_freed\": 1}") + ) + .addRoute( + RequestRouter.LOCAL_AUTHORITY, + "/_search/point_in_time", + hardcodedJsonHandler("DELETE", "{\"pits\": [{\"pit_id\": \"pit1\", \"successful\": true}]}") + ) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build() + ) + .setTlsStrategy(new BasicClientTlsStrategy(SSL_CONTEXT)) + .create(); + server.start(); + InetSocketAddress serverAddress = (InetSocketAddress) server.listen(new InetSocketAddress(0), URIScheme.HTTPS).get().getAddress(); + proxy = new FunnellingHttpsProxy(serverAddress.getPort()); + } - stubFor( - delete(urlPathEqualTo("/_search/point_in_time")).atPriority(1) - .willReturn(okJson("{\"pits\": [{\"pit_id\": \"pit1\", \"successful\": true}]}")) + private Supplier hardcodedJsonHandler(String method, String json) { + byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8); + return () -> new ReactiveServerExchangeHandler( + (request, entityDetails, responseChannel, context, requestBody, responseBodyFuture) -> { + receivedRequests.add(request); + if (!request.getMethod().equals(method)) { + responseChannel.sendResponse(new BasicHttpResponse(405), null, context); + return; + } + responseChannel.sendResponse( + new BasicHttpResponse(200), + new BasicEntityDetails(jsonBytes.length, APPLICATION_JSON), + context + ); + responseBodyFuture.execute(Flux.just(ByteBuffer.wrap(jsonBytes))); + } ); } - private OpenSearchClient getTestClient() throws Exception { + @After + public void teardown() { + server.close(CloseMode.IMMEDIATE); + server = null; + proxy.close(); + proxy = null; + receivedRequests.clear(); + } + + private OpenSearchClient getTestClient() throws URISyntaxException { AwsSdk2TransportOptions options = AwsSdk2TransportOptions.builder() .setCredentials(() -> AwsBasicCredentials.builder().accessKeyId("test-access-key").secretAccessKey("test-secret-key").build()) .setSigningClock(Clock.fixed(Instant.ofEpochSecond(1673626117), ZoneId.of("UTC"))) // 2023-01-13 16:08:37 +0000 @@ -118,25 +182,26 @@ private OpenSearchClient getTestClient() throws Exception { switch (sdkHttpClientType) { case AWS_CRT: sdkHttpClient = AwsCrtHttpClient.builder() - .proxyConfiguration(p -> p.scheme("http").host("localhost").port(wireMockRule.port())) + .proxyConfiguration(p -> p.scheme("http").host("localhost").port(proxy.getPort())) .buildWithDefaults(sdkConfig); break; case AWS_CRT_ASYNC: sdkAsyncHttpClient = AwsCrtAsyncHttpClient.builder() - .proxyConfiguration(p -> p.scheme("http").host("localhost").port(wireMockRule.port())) + .proxyConfiguration(p -> p.scheme("http").host("localhost").port(proxy.getPort())) .buildWithDefaults(sdkConfig); break; case APACHE: - var proxyConfig = software.amazon.awssdk.http.apache.ProxyConfiguration.builder() - .endpoint(new URI("http://localhost:" + wireMockRule.port())) + software.amazon.awssdk.http.apache.ProxyConfiguration proxyConfig = software.amazon.awssdk.http.apache.ProxyConfiguration + .builder() + .endpoint(new URI("http://localhost:" + proxy.getPort())) .build(); sdkHttpClient = ApacheHttpClient.builder().proxyConfiguration(proxyConfig).buildWithDefaults(sdkConfig); break; case NETTY_NIO_ASYNC: - var nettyProxyConfig = software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder() + ProxyConfiguration nettyProxyConfig = software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder() .scheme("http") .host("localhost") - .port(wireMockRule.port()) + .port(proxy.getPort()) .build(); sdkAsyncHttpClient = NettyNioAsyncHttpClient.builder().proxyConfiguration(nettyProxyConfig).buildWithDefaults(sdkConfig); break; @@ -153,12 +218,6 @@ private OpenSearchClient getTestClient() throws Exception { return new OpenSearchClient(transport); } - private LoggedRequest getReceivedRequest() { - var serveEvents = wireMockRule.getAllServeEvents(); - assertEquals(1, serveEvents.size()); - return serveEvents.get(0).getRequest(); - } - @Test public void testSigV4PutIndex() throws Exception { String expectedSignature = null; @@ -176,7 +235,7 @@ public void testSigV4PutIndex() throws Exception { OpenSearchClient client = getTestClient(); - var resp = client.indices() + CreateIndexResponse resp = client.indices() .create( b -> b.index("sample-index1") .aliases("sample-alias1", a -> a) @@ -186,21 +245,7 @@ public void testSigV4PutIndex() throws Exception { assertEquals("sample-index1", resp.index()); assertEquals(Boolean.TRUE, resp.acknowledged()); - var req = getReceivedRequest(); - - assertEquals(RequestMethod.PUT, req.getMethod()); - assertEquals(APPLICATION_JSON, req.getHeader(CONTENT_TYPE)); - assertEquals("156", req.getHeader(CONTENT_LENGTH)); - assertEquals(serviceHostName, req.getHeader("Host")); - assertEquals("20230113T160837Z", req.getHeader("x-amz-date")); - assertEquals("381bb92a04d397cab611362eb3ac3e075db11ac08272d64763de2279e2b5604d", req.getHeader("x-amz-content-sha256")); - assertEquals( - "AWS4-HMAC-SHA256 Credential=test-access-key/20230113/ap-southeast-2/" - + serviceName - + "/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=" - + expectedSignature, - req.getHeader("Authorization") - ); + assertSigV4Request("PUT", 156, "381bb92a04d397cab611362eb3ac3e075db11ac08272d64763de2279e2b5604d", expectedSignature); } @Test @@ -222,27 +267,7 @@ public void testSigV4ClearScroll() throws Exception { client.clearScroll(); - var req = getReceivedRequest(); - - assertEquals(RequestMethod.DELETE, req.getMethod()); - assertEquals(APPLICATION_JSON, req.getHeader(CONTENT_TYPE)); - var contentLength = req.getHeader(CONTENT_LENGTH); - if (sdkHttpClientType != SdkHttpClientType.APACHE) { - assertEquals("2", contentLength); - } else { - // Apache client does not set content-length for DELETE requests - assertNull(contentLength); - } - assertEquals(serviceHostName, req.getHeader("Host")); - assertEquals("20230113T160837Z", req.getHeader("x-amz-date")); - assertEquals("44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", req.getHeader("x-amz-content-sha256")); - assertEquals( - "AWS4-HMAC-SHA256 Credential=test-access-key/20230113/ap-southeast-2/" - + serviceName - + "/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=" - + expectedSignature, - req.getHeader("Authorization") - ); + assertSigV4Request("DELETE", 2, "44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", expectedSignature); } @Test @@ -262,28 +287,35 @@ public void testSigV4DeletePit() throws Exception { OpenSearchClient client = getTestClient(); - client.deletePit(d -> d.pitId(List.of("pit1"))); + client.deletePit(d -> d.pitId("pit1")); - var req = getReceivedRequest(); + assertSigV4Request("DELETE", 19, "daaa6af55a9cfe622f46de69ebc3b4df84703f320b839346b7fb4cf94bdbd766", expectedSignature); + } - assertEquals(RequestMethod.DELETE, req.getMethod()); - assertEquals(APPLICATION_JSON, req.getHeader(CONTENT_TYPE)); - var contentLength = req.getHeader(CONTENT_LENGTH); - if (sdkHttpClientType != SdkHttpClientType.APACHE) { - assertEquals("19", contentLength); + private void assertSigV4Request(String method, int contentLength, String contentSha256, String expectedSignature) + throws ProtocolException { + assertEquals(1, receivedRequests.size()); + HttpRequest req = receivedRequests.poll(); + assertNotNull(req); + + assertEquals(method, req.getMethod()); + assertEquals(APPLICATION_JSON.getMimeType(), req.getHeader(CONTENT_TYPE).getValue()); + Header contentLengthHdr = req.getHeader(CONTENT_LENGTH); + if (sdkHttpClientType != SdkHttpClientType.APACHE || !"DELETE".equals(method)) { + assertEquals(String.valueOf(contentLength), contentLengthHdr.getValue()); } else { // Apache client does not set content-length for DELETE requests - assertNull(contentLength); + assertNull(contentLengthHdr); } - assertEquals(serviceHostName, req.getHeader("Host")); - assertEquals("20230113T160837Z", req.getHeader("x-amz-date")); - assertEquals("daaa6af55a9cfe622f46de69ebc3b4df84703f320b839346b7fb4cf94bdbd766", req.getHeader("x-amz-content-sha256")); + assertEquals(serviceHostName, req.getHeader("Host").getValue()); + assertEquals("20230113T160837Z", req.getHeader("x-amz-date").getValue()); + assertEquals(contentSha256, req.getHeader("x-amz-content-sha256").getValue()); assertEquals( "AWS4-HMAC-SHA256 Credential=test-access-key/20230113/ap-southeast-2/" + serviceName + "/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, Signature=" + expectedSignature, - req.getHeader("Authorization") + req.getHeader("Authorization").getValue() ); } } diff --git a/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java b/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java new file mode 100644 index 0000000000..19b62a7eb6 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/util/FunnellingHttpsProxy.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.util; + +import static org.apache.hc.core5.http.HttpStatus.SC_METHOD_NOT_ALLOWED; +import static org.apache.hc.core5.http.HttpStatus.SC_OK; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; + +public class FunnellingHttpsProxy implements Closeable { + private static final int SO_TIMEOUT = 5000; + + @Nonnull + private final ServerSocket serverSocket; + @Nonnull + private final InetSocketAddress boundAddress; + private final int redirectToPort; + @Nonnull + private final List connectionHandlers; + @Nonnull + private final List sockets; + private final Thread acceptThread; + private volatile boolean running; + + public FunnellingHttpsProxy(int redirectToPort) throws Exception { + serverSocket = new ServerSocket(0); + boundAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress(); + this.redirectToPort = redirectToPort; + connectionHandlers = new ArrayList<>(); + sockets = new ArrayList<>(); + running = true; + acceptThread = new Thread(this::acceptConnections); + acceptThread.start(); + } + + public int getPort() { + return boundAddress.getPort(); + } + + @Override + public void close() { + if (!running) { + return; + } + running = false; + closeQuietly(serverSocket); + try { + acceptThread.join(); + } catch (InterruptedException ignored) {} + for (Socket socket : sockets) { + closeQuietly(socket); + } + for (Thread handler : connectionHandlers) { + try { + handler.join(); + } catch (InterruptedException ignored) {} + } + } + + private void acceptConnections() { + while (running) { + try { + Socket socket = serverSocket.accept(); + sockets.add(socket); + socket.setSoTimeout(SO_TIMEOUT); + Thread handler = new Thread(handleConnection(socket)); + connectionHandlers.add(handler); + handler.start(); + } catch (Exception ignored) {} + } + } + + private Runnable handleConnection(Socket clientSocket) { + return () -> { + InputStream clientInput = null; + OutputStream clientOutput = null; + Socket serverSocket = null; + InputStream serverInput = null; + OutputStream serverOutput = null; + + try { + clientInput = clientSocket.getInputStream(); + clientOutput = clientSocket.getOutputStream(); + + String httpRequest = readHttpMessage(clientInput); + + if (!httpRequest.startsWith("CONNECT ")) { + writeHttpStatus(clientOutput, SC_METHOD_NOT_ALLOWED); + return; + } + + serverSocket = new Socket("localhost", redirectToPort); + serverSocket.setSoTimeout(SO_TIMEOUT); + serverInput = serverSocket.getInputStream(); + serverOutput = serverSocket.getOutputStream(); + + writeHttpStatus(clientOutput, SC_OK); + + Thread serverToClient = new Thread(pipeline(serverInput, clientOutput)); + serverToClient.start(); + + pipeline(clientInput, serverOutput).run(); + + serverToClient.join(); + } catch (IOException | InterruptedException ignored) {} finally { + closeQuietly(clientInput, clientOutput, clientSocket, serverInput, serverOutput, serverSocket); + } + }; + } + + private Runnable pipeline(InputStream input, OutputStream output) { + return () -> { + byte[] buffer = new byte[4096]; + try { + int n; + while (running && -1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + if (input.available() < 1) { + output.flush(); + } + } + } catch (IOException ignored) {} + }; + } + + private static String readHttpMessage(InputStream input) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + StringBuilder message = new StringBuilder(); + while (true) { + String line = reader.readLine(); + if (line == null || line.isEmpty()) { + break; + } + message.append(line).append("\r\n"); + } + return message.toString(); + } + + private static void writeHttpStatus(OutputStream output, int status) throws IOException { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output)); + writer.write("HTTP/1.1 " + status + " " + EnglishReasonPhraseCatalog.INSTANCE.getReason(status, null) + "\r\n"); + writer.write("\r\n"); + writer.flush(); + } + + private static void closeQuietly(Closeable... closeables) { + if (closeables == null) return; + for (Closeable closeable : closeables) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException ignored) {} + } + } + } +} diff --git a/java-client/src/test/java/org/opensearch/client/transport/util/GeneratedCertificateSSLContext.java b/java-client/src/test/java/org/opensearch/client/transport/util/GeneratedCertificateSSLContext.java new file mode 100644 index 0000000000..1d140d93d6 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/util/GeneratedCertificateSSLContext.java @@ -0,0 +1,315 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.util; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.security.KeyManagementException; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.ZonedDateTime; +import java.util.Date; +import java.util.Random; +import java.util.Vector; +import java.util.function.BiConsumer; +import java.util.function.Function; +import javax.net.ssl.SSLContext; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.opensearch.client.util.TriConsumer; +import org.opensearch.client.util.TriFunction; +import sun.security.util.KnownOIDs; +import sun.security.util.ObjectIdentifier; +import sun.security.x509.AlgorithmId; +import sun.security.x509.AuthorityKeyIdentifierExtension; +import sun.security.x509.BasicConstraintsExtension; +import sun.security.x509.CertificateAlgorithmId; +import sun.security.x509.CertificateExtensions; +import sun.security.x509.CertificateSerialNumber; +import sun.security.x509.CertificateValidity; +import sun.security.x509.CertificateVersion; +import sun.security.x509.CertificateX509Key; +import sun.security.x509.DNSName; +import sun.security.x509.ExtendedKeyUsageExtension; +import sun.security.x509.Extension; +import sun.security.x509.GeneralName; +import sun.security.x509.GeneralNames; +import sun.security.x509.IPAddressName; +import sun.security.x509.KeyIdentifier; +import sun.security.x509.KeyUsageExtension; +import sun.security.x509.SerialNumber; +import sun.security.x509.SubjectAlternativeNameExtension; +import sun.security.x509.SubjectKeyIdentifierExtension; +import sun.security.x509.X500Name; +import sun.security.x509.X509CertImpl; +import sun.security.x509.X509CertInfo; + +@SuppressWarnings("sunapi") +public class GeneratedCertificateSSLContext { + private static final String KEY_ALGORITHM = "RSA"; + private static final String SIGNATURE_ALGORITHM = "SHA256with" + KEY_ALGORITHM; + private static final char[] KEYSTORE_PASSWORD = "password".toCharArray(); + + public static SSLContext generate() throws NoSuchAlgorithmException, IOException, CertificateException, KeyStoreException, + KeyManagementException, UnrecoverableKeyException { + KeyPair caKey = generateKeyPair(); + String caSubject = "DC=localhost, O=localhost, OU=localhost Root CA, CN=localhost Root CA"; + X509CertInfo caInfo = makeX509CertInfo( + caSubject, + caKey.getPublic(), + caSubject, + makeCertificateAuthorityExtensions(caKey.getPublic()) + ); + X509CertImpl caCert = newSigned.apply(caInfo, caKey.getPrivate(), SIGNATURE_ALGORITHM); + + KeyPair hostKey = generateKeyPair(); + String hostSubject = "DC=localhost, O=localhost, OU=localhost, CN=localhost"; + X509CertInfo hostInfo = makeX509CertInfo( + hostSubject, + hostKey.getPublic(), + caSubject, + makeHostCertificateExtensions(hostKey.getPublic(), caCert) + ); + X509CertImpl hostCert = newSigned.apply(hostInfo, caKey.getPrivate(), SIGNATURE_ALGORITHM); + + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(null, KEYSTORE_PASSWORD); + keyStore.setKeyEntry("localhost", hostKey.getPrivate(), KEYSTORE_PASSWORD, new X509Certificate[] { hostCert, caCert }); + + return SSLContextBuilder.create() + .loadKeyMaterial(keyStore, KEYSTORE_PASSWORD, (aliases, sslParameters) -> "localhost") + .loadTrustMaterial(keyStore, null) + .build(); + } + + private static CertificateExtensions makeCertificateAuthorityExtensions(PublicKey publicKey) throws IOException { + CertificateExtensions extensions = new CertificateExtensions(); + + KeyIdentifier keyId = new KeyIdentifier(publicKey); + set(extensions, AuthorityKeyIdentifierExtension.NAME, new AuthorityKeyIdentifierExtension(keyId, null, null)); + set(extensions, SubjectKeyIdentifierExtension.NAME, new SubjectKeyIdentifierExtension(keyId.getIdentifier())); + + set(extensions, BasicConstraintsExtension.NAME, new BasicConstraintsExtension(true, true, Integer.MAX_VALUE)); + + KeyUsageExtension keyUsage = new KeyUsageExtension(); + keyUsage.set(KeyUsageExtension.DIGITAL_SIGNATURE, true); + keyUsage.set(KeyUsageExtension.KEY_CERTSIGN, true); + keyUsage.set(KeyUsageExtension.CRL_SIGN, true); + set(extensions, KeyUsageExtension.NAME, keyUsage); + + return extensions; + } + + private static CertificateExtensions makeHostCertificateExtensions(PublicKey publicKey, X509Certificate caCert) throws IOException { + CertificateExtensions extensions = new CertificateExtensions(); + + set( + extensions, + AuthorityKeyIdentifierExtension.NAME, + new AuthorityKeyIdentifierExtension( + new KeyIdentifier(caCert.getPublicKey()), + new GeneralNames().add(new GeneralName(new X500Name(caCert.getSubjectX500Principal().getName()))), + new SerialNumber(caCert.getSerialNumber()) + ) + ); + + KeyIdentifier keyId = new KeyIdentifier(publicKey); + set(extensions, SubjectKeyIdentifierExtension.NAME, new SubjectKeyIdentifierExtension(keyId.getIdentifier())); + + set(extensions, BasicConstraintsExtension.NAME, new BasicConstraintsExtension(false, Integer.MAX_VALUE)); + + KeyUsageExtension keyUsage = new KeyUsageExtension(); + keyUsage.set(KeyUsageExtension.DIGITAL_SIGNATURE, true); + keyUsage.set(KeyUsageExtension.NON_REPUDIATION, true); + keyUsage.set(KeyUsageExtension.KEY_ENCIPHERMENT, true); + set(extensions, KeyUsageExtension.NAME, keyUsage); + + Vector extendedKeyUsage = new Vector(); + extendedKeyUsage.add(ObjectIdentifier.of(KnownOIDs.clientAuth)); + extendedKeyUsage.add(ObjectIdentifier.of(KnownOIDs.serverAuth)); + set(extensions, ExtendedKeyUsageExtension.NAME, new ExtendedKeyUsageExtension(true, extendedKeyUsage)); + + set( + extensions, + SubjectAlternativeNameExtension.NAME, + new SubjectAlternativeNameExtension( + new GeneralNames().add(new GeneralName(new DNSName("localhost"))).add(new GeneralName(new IPAddressName("127.0.0.1"))) + ) + ); + + return extensions; + } + + private static X509CertInfo makeX509CertInfo( + String subject, + PublicKey publicKey, + String issuer, + CertificateExtensions certificateExtensions + ) throws IOException, NoSuchAlgorithmException { + ZonedDateTime start = ZonedDateTime.now().minusDays(1); + ZonedDateTime end = start.plusDays(7); + + X509CertInfo info = new X509CertInfo(); + setVersion.accept(info, new CertificateVersion(CertificateVersion.V3)); + setSerialNumber.accept(info, new CertificateSerialNumber(new Random().nextInt() & 0x7fffffff)); + setAlgorithmId.accept(info, new CertificateAlgorithmId(AlgorithmId.get(SIGNATURE_ALGORITHM))); + setSubject.accept(info, new X500Name(subject)); + setKey.accept(info, new CertificateX509Key(publicKey)); + setValidity.accept(info, new CertificateValidity(Date.from(start.toInstant()), Date.from(end.toInstant()))); + setIssuer.accept(info, new X500Name(issuer)); + setExtensions.accept(info, certificateExtensions); + + return info; + } + + private static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance(KEY_ALGORITHM); + keyGen.initialize(2048, new SecureRandom()); + return keyGen.generateKeyPair(); + } + + private static void set(CertificateExtensions extensions, String name, Extension value) { + setExtension.accept(extensions, name, value); + } + + private static final TriConsumer setExtension; + private static final BiConsumer setVersion; + private static final BiConsumer setSerialNumber; + private static final BiConsumer setAlgorithmId; + private static final BiConsumer setSubject; + private static final BiConsumer setKey; + private static final BiConsumer setValidity; + private static final BiConsumer setIssuer; + private static final BiConsumer setExtensions; + private static final TriFunction newSigned; + + static { + try { + String specificationVersion = System.getProperty("java.specification.version"); + if (specificationVersion.startsWith("1.")) { + specificationVersion = specificationVersion.substring(2); + } + + if (Integer.parseInt(specificationVersion) >= 20) { + setExtension = findVoidMethod(CertificateExtensions.class, "setExtension", String.class, Extension.class); + setVersion = findVoidMethod(X509CertInfo.class, "setVersion", CertificateVersion.class); + setSerialNumber = findVoidMethod(X509CertInfo.class, "setSerialNumber", CertificateSerialNumber.class); + setAlgorithmId = findVoidMethod(X509CertInfo.class, "setAlgorithmId", CertificateAlgorithmId.class); + setSubject = findVoidMethod(X509CertInfo.class, "setSubject", X500Name.class); + setKey = findVoidMethod(X509CertInfo.class, "setKey", CertificateX509Key.class); + setValidity = findVoidMethod(X509CertInfo.class, "setValidity", CertificateValidity.class); + setIssuer = findVoidMethod(X509CertInfo.class, "setIssuer", X500Name.class); + setExtensions = findVoidMethod(X509CertInfo.class, "setExtensions", CertificateExtensions.class); + newSigned = findStaticMethod( + X509CertImpl.class, + "newSigned", + X509CertImpl.class, + X509CertInfo.class, + PrivateKey.class, + String.class + ); + } else { + setExtension = findVoidMethod(CertificateExtensions.class, "set", String.class, Object.class)::accept; + TriConsumer setCertInfo = findVoidMethod( + X509CertInfo.class, + "set", + String.class, + Object.class + ); + setVersion = (info, version) -> setCertInfo.accept(info, X509CertInfo.VERSION, version); + setSerialNumber = (info, serialNumber) -> setCertInfo.accept(info, X509CertInfo.SERIAL_NUMBER, serialNumber); + setAlgorithmId = (info, algorithmId) -> setCertInfo.accept(info, X509CertInfo.ALGORITHM_ID, algorithmId); + setSubject = (info, subject) -> setCertInfo.accept(info, X509CertInfo.SUBJECT, subject); + setKey = (info, key) -> setCertInfo.accept(info, X509CertInfo.KEY, key); + setValidity = (info, validity) -> setCertInfo.accept(info, X509CertInfo.VALIDITY, validity); + setIssuer = (info, issuer) -> setCertInfo.accept(info, X509CertInfo.ISSUER, issuer); + setExtensions = (info, extensions) -> setCertInfo.accept(info, X509CertInfo.EXTENSIONS, extensions); + + Function x509CertImplCtor = findCtor(X509CertImpl.class, X509CertInfo.class); + TriConsumer sign = findVoidMethod( + X509CertImpl.class, + "sign", + PrivateKey.class, + String.class + ); + newSigned = (info, privateKey, algorithm) -> { + X509CertImpl cert = x509CertImplCtor.apply(info); + sign.accept(cert, privateKey, algorithm); + return cert; + }; + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private static Function findCtor(Class clazz, Class aType) throws Throwable { + MethodHandle handle = MethodHandles.lookup().findConstructor(clazz, MethodType.methodType(void.class, aType)); + return (a) -> { + try { + // noinspection unchecked + return (This) handle.invoke(a); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } + + private static TriFunction findStaticMethod( + Class clazz, + String name, + Class rType, + Class aType, + Class bType, + Class cType + ) throws Throwable { + MethodHandle handle = MethodHandles.lookup().findStatic(clazz, name, MethodType.methodType(rType, aType, bType, cType)); + return (a, b, c) -> { + try { + // noinspection unchecked + return (Return) handle.invoke(a, b, c); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } + + private static BiConsumer findVoidMethod(Class clazz, String name, Class aType) throws Throwable { + MethodHandle handle = MethodHandles.lookup().findVirtual(clazz, name, MethodType.methodType(void.class, aType)); + return (thiz, a) -> { + try { + handle.bindTo(thiz).invoke(a); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } + + private static TriConsumer findVoidMethod(Class clazz, String name, Class aType, Class bType) + throws Throwable { + MethodHandle handle = MethodHandles.lookup().findVirtual(clazz, name, MethodType.methodType(void.class, aType, bType)); + return (thiz, a, b) -> { + try { + handle.bindTo(thiz).invoke(a, b); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } +}