Open
Description
The client never gets notified when server goes away. It just waits indefinitely.
The following unit test creates a simple server that streams 200 messages to the client. Client will call Object.wait() on server socket effectively hanging the server after receiving 2 messages and then wait for 20 more messages or it will time out in 10 seconds. The client doesn't throw any exceptions nor signals onError.
package com.example.demo;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.poc.protobuf.UnitRequest;
import com.poc.protobuf.UnitResponse;
import com.poc.protobuf.UnitService;
import com.poc.protobuf.UnitServiceClient;
import com.poc.protobuf.UnitServiceServer;
import io.netty.buffer.ByteBuf;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class DefaultSimpleServiceTests {
class DefaultSimpleService implements UnitService {
@Override
public Flux<UnitResponse> requestStream(UnitRequest unitRequest, ByteBuf metadata) {
String command = unitRequest.getRequestCommandMessage();
return Flux.range(1, 200)
.map(i -> UnitResponse.newBuilder()
.setMessageNumber(i)
.setResponseMessage(i + " Srever is processsing " + command + " command")
.build());
}
}
@Test
public void test1() throws Exception {
UnitServiceServer serviceServer = new UnitServiceServer(new DefaultSimpleService(), Optional.empty(), Optional.empty());
NettyContextCloseable serverSocket = RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) ->
Mono.just(new RequestHandlingRSocket(serviceServer)))
.transport(TcpServerTransport.create(8801))
.start()
.block();
RSocket rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8801)).start().block();
UnitServiceClient client = new UnitServiceClient(rSocket);
CountDownLatch latch = new CountDownLatch(22);
client.requestStream(UnitRequest.newBuilder().setRequestCommandMessage("Give me some data!").build())
.subscribe(new Subscriber<UnitResponse>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(UnitResponse t) {
System.out.println("Received message " + t.getResponseMessage());
latch.countDown();
if (latch.getCount() < 20) {
System.out.println("Killing server now...");
try {
// this will halt the thread causing server to disappear
serverSocket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("Error detected! " + t);
}
@Override
public void onComplete() {
System.out.println("Stream completed!");
}
} );
latch.await(10, TimeUnit.SECONDS);
System.out.println("Finished at " + new Date().toString());
}
}
proto file
syntax = "proto3";
package com.harris.atom.poc;
import "google/protobuf/empty.proto";
option java_package = "com.poc.protobuf";
option java_outer_classname = "UnitServiceProto";
option java_multiple_files = true;
service UnitService {
// Single Request / Streaming Response
rpc RequestStream (UnitRequest) returns (stream UnitResponse) {}
}
message UnitRequest {
string requestCommandMessage = 1;
}
message UnitResponse {
string responseMessage = 1;
int32 messageNumber = 2;
}
Metadata
Metadata
Assignees
Labels
No labels
Type
Projects
Milestone
Relationships
Development
No branches or pull requests
Activity
robertroeser commentedon Nov 4, 2018
@mostroverkhov does your fix to rsocket help with this?
mostroverkhov commentedon Nov 5, 2018
@robertroeser It helps, but reactor/reactor-netty#495 has to be resolved also