Skip to content

observeByteStream() Subscriber not cancelled #64

@arajthala

Description

@arajthala

Disposing observeStringStream() doesnot cancel the subscriber from observeByteStream() causing exception when trying to close connection.

Caused by: com.github.ivbaranov.rxbluetooth.exceptions.ConnectionClosedException: Can't read stream.

Start reading string stream:
Disposable disposable = mBluetoothConnection.observeStringStream() .subscribeOn(Schedulers.computation()) .subscribe(s -> Log.e("Data", s));

Stop reading stream:
disposable.dispose()

Call
bluetooth.closeConnection()

Never cancels subscriber and throws exception
Class BluetoothConnection.java

public Flowable<Byte> observeByteStream() {
    if (observeInputStream == null) {
      observeInputStream = Flowable.create(new FlowableOnSubscribe<Byte>() {
        @Override public void subscribe(final FlowableEmitter<Byte> subscriber) {
          while (!subscriber.isCancelled()) {
            try {
              subscriber.onNext((byte) inputStream.read());
            } catch (IOException e) {
              connected = false;
              **subscriber.onError(new ConnectionClosedException("Can't read stream", e));**
            } finally {
              if (!connected) {
                closeConnection();
              }
            }
          }
        }
      }, BackpressureStrategy.BUFFER).share();
    }
    return observeInputStream;
  }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions