Skip to content

Recieve connections get stale #28

Open
@UglyHobbitFeet

Description

@UglyHobbitFeet

I am using a GraphQL subscription to listen to the AMQP receiver and send back live results as they come in. I notice that the receiver stops sending on an address after some period of time (around 3 mins). If I change the address, the connection works again until some period of time (around 3 mins).

Is there any way to check if a connection is stale and reconnect? Or better yet, a keep-alive flag? Is there something I should be doing differently in the code? Note - I tried both rxJava and non-rxJava versions and they give me the same result.

Example rxJava code is below

import io.vertx.reactivex.amqp.AmqpConnection;

static Publisher<String> receive(AmqpConnection connection, String address) {
  return connection.rxCreateReceiver(address)
      .flatMapPublisher(receiver ->
          receiver.toObservable()
          .doFinally(receiver::rxClose)
	  .doOnError(onError -> onError.printStackTrace())
	  .onErrorResumeNext(throwable -> {
	    throwable.printStackTrace();
	    return Observable.empty();
	   })
  	  .map(amqpMsg -> amqpMsg.bodyAsString())
          .share()
          .toFlowable(BackpressureStrategy.BUFFER));
}  

FWIW I'm on vert.x 3.8.5 for everything

This kinda seems related to #23

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions