Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.helidon.examples.microprofile.grpc.Strings.StringMessage;
import io.helidon.grpc.api.Grpc;
import io.helidon.grpc.core.CollectingObserver;

Expand Down Expand Up @@ -50,8 +51,8 @@ public Descriptors.FileDescriptor proto() {
* @return string message
*/
@Grpc.Unary("Upper")
public Strings.StringMessage upper(Strings.StringMessage request) {
return newMessage(request.getText().toUpperCase());
public StringMessage upper(StringMessage request) {
return newStringMessage(request.getText().toUpperCase());
}

/**
Expand All @@ -61,8 +62,8 @@ public Strings.StringMessage upper(Strings.StringMessage request) {
* @return string message
*/
@Grpc.Unary("Lower")
public Strings.StringMessage lower(Strings.StringMessage request) {
return newMessage(request.getText().toLowerCase());
public StringMessage lower(StringMessage request) {
return newStringMessage(request.getText().toLowerCase());
}

/**
Expand All @@ -72,9 +73,9 @@ public Strings.StringMessage lower(Strings.StringMessage request) {
* @return stream of string messages
*/
@Grpc.ServerStreaming("Split")
public Stream<Strings.StringMessage> split(Strings.StringMessage request) {
public Stream<StringMessage> split(StringMessage request) {
String[] parts = request.getText().split(" ");
return Stream.of(parts).map(this::newMessage);
return Stream.of(parts).map(this::newStringMessage);
}

/**
Expand All @@ -84,16 +85,16 @@ public Stream<Strings.StringMessage> split(Strings.StringMessage request) {
* @return single message as a stream
*/
@Grpc.ClientStreaming("Join")
public StreamObserver<Strings.StringMessage> join(StreamObserver<Strings.StringMessage> observer) {
public StreamObserver<StringMessage> join(StreamObserver<StringMessage> observer) {
return CollectingObserver.create(
Collectors.joining(" "),
observer,
Strings.StringMessage::getText,
this::newMessage);
StringMessage::getText,
this::newStringMessage);
}

private Strings.StringMessage newMessage(String text) {
return Strings.StringMessage.newBuilder().setText(text).build();
private StringMessage newStringMessage(String text) {
return StringMessage.newBuilder().setText(text).build();
}
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.stream.Stream;

import io.helidon.examples.microprofile.grpc.Strings.StringMessage;
import io.helidon.grpc.api.Grpc;

import io.grpc.stub.StreamObserver;
Expand All @@ -36,7 +37,7 @@ public interface StringServiceClient {
* @return string message
*/
@Grpc.Unary("Upper")
Strings.StringMessage upper(Strings.StringMessage request);
StringMessage upper(StringMessage request);

/**
* Lowercase a string.
Expand All @@ -45,7 +46,7 @@ public interface StringServiceClient {
* @return string message
*/
@Grpc.Unary("Lower")
Strings.StringMessage lower(Strings.StringMessage request);
StringMessage lower(StringMessage request);

/**
* Split a string using space delimiters.
Expand All @@ -54,7 +55,7 @@ public interface StringServiceClient {
* @return stream of string messages
*/
@Grpc.ServerStreaming("Split")
Stream<Strings.StringMessage> split(Strings.StringMessage request);
Stream<StringMessage> split(StringMessage request);

/**
* Join a stream of messages using spaces.
Expand All @@ -63,6 +64,6 @@ public interface StringServiceClient {
* @return single message as a stream
*/
@Grpc.ClientStreaming("Join")
StreamObserver<Strings.StringMessage> join(StreamObserver<Strings.StringMessage> observer);
StreamObserver<StringMessage> join(StreamObserver<StringMessage> observer);
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ server:
resource-path: "server.p12"
features:
grpc-reflection:
enabled: true
enabled: false

grpc:
client:
channels:
- name: "string-channel"
port: 0
tls:
trust:
keystore:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import io.helidon.grpc.api.Grpc;
import io.helidon.microprofile.grpc.client.GrpcConfigurablePort;
import io.helidon.microprofile.testing.junit5.HelidonTest;
import io.helidon.examples.microprofile.grpc.Strings.StringMessage;

import io.grpc.stub.StreamObserver;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -55,33 +56,33 @@ void updatePort() {

@Test
void testUnaryUpper() {
Strings.StringMessage res = client.upper(newMessage("hello"));
StringMessage res = client.upper(newStringMessage("hello"));
assertThat(res.getText(), is("HELLO"));
}

@Test
void testUnaryLower() {
Strings.StringMessage res = client.lower(newMessage("HELLO"));
StringMessage res = client.lower(newStringMessage("HELLO"));
assertThat(res.getText(), is("hello"));
}

@Test
void testServerStreamingSplit() {
Stream<Strings.StringMessage> stream = client.split(newMessage("hello world"));
List<Strings.StringMessage> value = stream.toList();
Stream<StringMessage> stream = client.split(newStringMessage("hello world"));
List<StringMessage> value = stream.toList();
assertThat(value, hasSize(2));
assertThat(value, contains(newMessage("hello"), newMessage("world")));
assertThat(value, contains(newStringMessage("hello"), newStringMessage("world")));
}

@Test
void testClientStreamingJoin() throws InterruptedException {
ListObserver<Strings.StringMessage> response = new ListObserver<>();
StreamObserver<Strings.StringMessage> request = client.join(response);
request.onNext(newMessage("hello"));
request.onNext(newMessage("world"));
ListObserver<StringMessage> response = new ListObserver<>();
StreamObserver<StringMessage> request = client.join(response);
request.onNext(newStringMessage("hello"));
request.onNext(newStringMessage("world"));
request.onCompleted();
List<Strings.StringMessage> value = response.value();
assertThat(value.getFirst(), is(newMessage("hello world")));
List<StringMessage> value = response.value();
assertThat(value.getFirst(), is(newStringMessage("hello world")));
}

/**
Expand All @@ -90,8 +91,8 @@ void testClientStreamingJoin() throws InterruptedException {
* @param data the string
* @return the string message
*/
Strings.StringMessage newMessage(String data) {
return Strings.StringMessage.newBuilder().setText(data).build();
StringMessage newStringMessage(String data) {
return StringMessage.newBuilder().setText(data).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,15 +62,15 @@ private void lower(StringMessage request, StreamObserver<StringMessage> observer

private void split(StringMessage request, StreamObserver<StringMessage> observer) {
String[] parts = request.getText().split(" ");
stream(observer, Stream.of(parts).map(this::response));
stream(observer, Stream.of(parts).map(this::newStringMessage));
}

private StreamObserver<StringMessage> join(StreamObserver<StringMessage> observer) {
return CollectingObserver.create(
Collectors.joining(" "),
observer,
StringMessage::getText,
this::response);
this::newStringMessage);
}

private StreamObserver<Strings.StringMessage> echo(StreamObserver<Strings.StringMessage> observer) {
Expand All @@ -92,7 +92,7 @@ public void onCompleted() {
};
}

private StringMessage response(String text) {
private StringMessage newStringMessage(String text) {
return StringMessage.newBuilder().setText(text).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import io.helidon.webserver.grpc.GrpcRouting;
import io.helidon.webserver.testing.junit5.ServerTest;
import io.helidon.webserver.testing.junit5.SetUpRoute;
import io.helidon.examples.webserver.grpc.Strings.StringMessage;

import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -77,23 +78,23 @@ static void routing(Router.RouterBuilder<?> router) {
void testUnaryUpper() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Strings.StringMessage res = service.upper(newStringMessage("hello"));
StringMessage res = service.upper(newStringMessage("hello"));
assertThat(res.getText(), is("HELLO"));
}

@Test
void testUnaryLower() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Strings.StringMessage res = service.lower(newStringMessage("HELLO"));
StringMessage res = service.lower(newStringMessage("HELLO"));
assertThat(res.getText(), is("hello"));
}

@Test
void testServerStreamingSplit() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(grpcClient.channel());
Iterator<Strings.StringMessage> res = service.split(newStringMessage("hello world"));
Iterator<StringMessage> res = service.split(newStringMessage("hello world"));
assertThat(res.next().getText(), is("hello"));
assertThat(res.next().getText(), is("world"));
assertThat(res.hasNext(), is(false));
Expand All @@ -103,25 +104,25 @@ void testServerStreamingSplit() {
void testClientStreamingJoin() throws ExecutionException, InterruptedException, TimeoutException {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Strings.StringMessage> future = new CompletableFuture<>();
StreamObserver<Strings.StringMessage> req = service.join(singleStreamObserver(future));
CompletableFuture<StringMessage> future = new CompletableFuture<>();
StreamObserver<StringMessage> req = service.join(singleStreamObserver(future));
req.onNext(newStringMessage("hello"));
req.onNext(newStringMessage("world"));
req.onCompleted();
Strings.StringMessage res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
StringMessage res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.getText(), is("hello world"));
}

@Test
void testBidirectionalEcho() throws ExecutionException, InterruptedException, TimeoutException {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
StringServiceGrpc.StringServiceStub service = StringServiceGrpc.newStub(grpcClient.channel());
CompletableFuture<Iterator<Strings.StringMessage>> future = new CompletableFuture<>();
StreamObserver<Strings.StringMessage> req = service.echo(multiStreamObserver(future));
CompletableFuture<Iterator<StringMessage>> future = new CompletableFuture<>();
StreamObserver<StringMessage> req = service.echo(multiStreamObserver(future));
req.onNext(newStringMessage("hello"));
req.onNext(newStringMessage("world"));
req.onCompleted();
Iterator<Strings.StringMessage> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Iterator<StringMessage> res = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(res.next().getText(), is("hello"));
assertThat(res.next().getText(), is("world"));
assertThat(res.hasNext(), is(false));
Expand All @@ -132,7 +133,7 @@ void testUnaryUpperInterceptor() {
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
Channel channel = grpcClient.channel(new StringServiceInterceptor());
StringServiceGrpc.StringServiceBlockingStub service = StringServiceGrpc.newBlockingStub(channel);
Strings.StringMessage res = service.upper(newStringMessage("hello"));
StringMessage res = service.upper(newStringMessage("hello"));
assertThat(res.getText(), is("[[HELLO]]"));
}

Expand All @@ -149,8 +150,8 @@ void testHealthHttp() {
}
}

static Strings.StringMessage newStringMessage(String data) {
return Strings.StringMessage.newBuilder().setText(data).build();
static StringMessage newStringMessage(String data) {
return StringMessage.newBuilder().setText(data).build();
}

static <ReqT> StreamObserver<ReqT> singleStreamObserver(CompletableFuture<ReqT> future) {
Expand Down