Skip to content

Commit cc0cf65

Browse files
committed
New test that simulates large uploads using a bidi method that acks each upload.
1 parent 2ca8f1b commit cc0cf65

File tree

3 files changed

+199
-0
lines changed

3 files changed

+199
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webserver.tests.grpc;
18+
19+
import io.helidon.webserver.grpc.GrpcService;
20+
import io.helidon.webserver.grpc.uploads.Uploads;
21+
import io.helidon.webserver.grpc.uploads.Uploads.Ack;
22+
import io.helidon.webserver.grpc.uploads.Uploads.Data;
23+
24+
import com.google.protobuf.Descriptors;
25+
import io.grpc.stub.StreamObserver;
26+
27+
class UploadService implements GrpcService {
28+
29+
static final Ack TRUE_ACK = Ack.newBuilder().setOk(true).build();
30+
31+
@Override
32+
public Descriptors.FileDescriptor proto() {
33+
return Uploads.getDescriptor();
34+
}
35+
36+
@Override
37+
public void update(Routing router) {
38+
router.bidi("Upload", this::grpcUpload);
39+
}
40+
41+
private StreamObserver<Data> grpcUpload(StreamObserver<Ack> result) {
42+
return new StreamObserver<>() {
43+
@Override
44+
public void onNext(Data data) {
45+
result.onNext(TRUE_ACK);
46+
}
47+
48+
@Override
49+
public void onError(Throwable t) {
50+
result.onError(t);
51+
}
52+
53+
@Override
54+
public void onCompleted() {
55+
result.onCompleted();
56+
}
57+
};
58+
}
59+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
syntax = "proto3";
18+
option java_package = "io.helidon.webserver.grpc.uploads";
19+
20+
service UploadService {
21+
rpc Upload (stream Data) returns (stream Ack) {}
22+
}
23+
24+
message Data {
25+
bytes payload = 1;
26+
}
27+
28+
message Ack {
29+
bool ok = 1;
30+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webserver.tests.grpc;
18+
19+
import java.util.Arrays;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.stream.Stream;
23+
24+
import io.helidon.webserver.Router;
25+
import io.helidon.webserver.WebServer;
26+
import io.helidon.webserver.grpc.GrpcRouting;
27+
import io.helidon.webserver.grpc.uploads.UploadServiceGrpc;
28+
import io.helidon.webserver.grpc.uploads.Uploads;
29+
import io.helidon.webserver.grpc.uploads.Uploads.Ack;
30+
import io.helidon.webserver.grpc.uploads.Uploads.Data;
31+
import io.helidon.webserver.testing.junit5.ServerTest;
32+
import io.helidon.webserver.testing.junit5.SetUpRoute;
33+
34+
import com.google.protobuf.ByteString;
35+
import io.grpc.stub.StreamObserver;
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.BeforeEach;
38+
import org.junit.jupiter.api.Test;
39+
40+
import static org.hamcrest.Matchers.is;
41+
import static org.hamcrest.MatcherAssert.assertThat;
42+
43+
@ServerTest
44+
class UploadServiceTest extends BaseServiceTest {
45+
46+
private static final byte[] DATA_50K = new byte[50 * 1024];
47+
private static final byte[] DATA_250K = new byte[250 * 1024];
48+
private static final byte[] DATA_500K = new byte[500 * 1024];
49+
50+
static {
51+
Arrays.fill(DATA_50K, (byte) 'A');
52+
Arrays.fill(DATA_250K, (byte) 'B');
53+
Arrays.fill(DATA_500K, (byte) 'C');
54+
}
55+
56+
private UploadServiceGrpc.UploadServiceStub stub;
57+
58+
UploadServiceTest(WebServer server) {
59+
super(server);
60+
}
61+
62+
@BeforeEach
63+
void beforeEach() {
64+
super.beforeEach();
65+
stub = UploadServiceGrpc.newStub(channel);
66+
}
67+
68+
@AfterEach
69+
void afterEach() throws InterruptedException {
70+
super.afterEach();
71+
stub = null;
72+
}
73+
74+
@SetUpRoute
75+
static void routing(Router.RouterBuilder<?> router) {
76+
router.addRouting(GrpcRouting.builder().service(new UploadService()));
77+
}
78+
79+
@Test
80+
void testUpload() throws Throwable {
81+
// setup upload call
82+
CountDownLatch latch = new CountDownLatch(3);
83+
StreamObserver<Data> request = stub.upload(new StreamObserver<>() {
84+
@Override
85+
public void onNext(Ack value) {
86+
if (value.getOk()) {
87+
latch.countDown();
88+
}
89+
}
90+
91+
@Override
92+
public void onError(Throwable t) {
93+
}
94+
95+
@Override
96+
public void onCompleted() {
97+
}
98+
});
99+
100+
// upload data
101+
Stream.of(DATA_50K, DATA_250K, DATA_500K)
102+
.map(b -> Uploads.Data.newBuilder()
103+
.setPayload(ByteString.copyFrom(b))
104+
.build())
105+
.forEach(request::onNext);
106+
107+
// verify upload complete
108+
assertThat(latch.await(10, TimeUnit.SECONDS), is(true));
109+
}
110+
}

0 commit comments

Comments
 (0)