Skip to content

Weird behavior with asSream monix response #1612

@AlexITC

Description

@AlexITC

I have been playing with a stream client powered by monix, I noticed a weird behavior that's present on HttpClientMonixBackend and FetchMonixBackend, there is a chance this could be present in other backends but I haven't tried others.

In short, I'm using monix to expose server-sent events as an observable, still, when the server is stopped, the observable stays active without streaming events anymore (even if the backend starts again).

Steps to reproduce (code at the bottom):

  1. Run sample server: scala-cli sse-server.scala
  2. Verify backend is streaming events with curl localhost:8080/seconds -N
  3. Run sample client: scala-cli sse-client.sc (it will start printing values).
  4. Stop server, client won't display values any more, still, stream stays active.
  5. Start server, client stays in the same state.

I'd expect a way to detect the connection problem to retry or raise an error, which happens when invoking asStreamUnsafe instead of asStream (see sse-client-unsafe.sc).

I'll take the opportunity to ask about the difference between asStream and asStreamAlways.

Code

sse-server.scala

//> using lib "org.http4s::http4s-ember-server:0.23.14"
//> using lib "org.http4s::http4s-dsl:0.23.14"
//> using lib "ch.qos.logback:logback-classic:1.2.11"

import cats.effect._
import cats.syntax.all._
import com.comcast.ip4s._
import fs2.Stream
import org.http4s._
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.implicits._
import org.http4s.server.middleware.Logger

import scala.concurrent.duration._

object SampleServer {

  val seconds = Stream.awakeEvery[IO](5.second)

  val routes = {
    import org.http4s.dsl.io._

    HttpRoutes.of[IO] {
      case GET -> Root / "seconds" =>
        val response = seconds.map { s =>
          s"data: ${s.toSeconds}\n\n"
        }
        Ok(response)
    }
  }


  def stream: Stream[IO, Nothing] = {
    val httpApp = routes.orNotFound
    val finalHttpApp = Logger.httpApp(true, true)(httpApp)
    for {
      exitCode <- Stream.resource(
        EmberServerBuilder.default[IO]
          .withHost(ipv4"0.0.0.0")
          .withPort(port"8080")
          .withHttpApp(finalHttpApp)
          .withShutdownTimeout(1.second)
          .build >>
          Resource.eval(Async[IO].never)
      )
    } yield exitCode
  }.drain
}

object MyApp extends IOApp.Simple {
  def run = SampleServer.stream.compile.drain.as(ExitCode.Success)
}

sse-client.sc

//> using lib "com.softwaremill.sttp.client3::core:3.8.3"
//> using lib "com.softwaremill.sttp.client3::monix:3.8.3"

import cats.effect.ExitCase
import monix.eval.Task
import monix.execution.Ack
import monix.reactive.{Consumer, Observable, OverflowStrategy}
import monix.reactive.observers.Subscriber
import sttp.capabilities.monix.MonixStreams
import sttp.client3._
import sttp.client3.httpclient.monix.HttpClientMonixBackend

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Promise}
//import sttp.client3.impl.monix.FetchMonixBackend
import sttp.client3.impl.monix.MonixServerSentEvents
import sttp.model._

import scala.concurrent.duration.DurationInt

println("Starting client...")
FailingStream.stream.foreach(x => println(s"Got: $x"))(monix.execution.Scheduler.global)

// prevents the app from being closed
Await.result(Promise().future, Duration.Inf)

object FailingStream {
  implicit val backend: SttpBackend[Task, MonixStreams] =
    Await.result(HttpClientMonixBackend().runToFuture(monix.execution.Scheduler.global), 10.seconds)

  private val ServerAPI = sttp.model.Uri
    .parse("http://localhost:8080")
    .getOrElse(throw new RuntimeException("Invalid server url"))

  def stream: Observable[String] = {
    val path = ServerAPI.path :+ "seconds"
    val uri = ServerAPI.withPath(path)

    // Apparently, there is a reconnection after shutting wifi off
    // Still, when shutting server off, there is no error but no reconnection occurs
    Observable
      .create[String](OverflowStrategy.DropOld(10)) { sub =>
        basicRequest
          .get(uri)
          .response(asStream(MonixStreams)(input => consumer(input, sub)))
          .send(backend)
          .runToFuture(sub.scheduler)
      }
      // NOTE: None of these messages are printed ever, which seems wrong
      .guaranteeCase {
        case ExitCase.Error(e) =>
          Task.delay {
            println(s"$uri stream failed with an error: ${e.getMessage}")
          }
        case ExitCase.Completed =>
          Task.delay {
            println(s"$uri stream completed")
          }
        case ExitCase.Canceled =>
          Task.delay {
            println(s"$uri stream canceled")
          }
      }
  }

  private def consumer(input: Observable[Array[Byte]], subscriber: Subscriber.Sync[String]): Task[Unit] = {
    input
      .transform(MonixServerSentEvents.parse)
      .flatMap { sse =>
        println(s"Event received: $sse")
        sse.data
          .map(Observable.now)
          .getOrElse(Observable.empty)
      }
      .consumeWith(Consumer.foreach { item =>
        subscriber.onNext(item) match {
          case Ack.Continue => println("There seem to be more items on the stream")
          case Ack.Stop => throw new RuntimeException("No more items can be accepted")
        }
      })
  }
}

sse-client-unsafe.sc

//> using lib "com.softwaremill.sttp.client3::core:3.8.3"
//> using lib "com.softwaremill.sttp.client3::monix:3.8.3"

import cats.effect.ExitCase
import monix.eval.Task
import monix.execution.Ack
import monix.reactive.{Consumer, Observable, OverflowStrategy}
import monix.reactive.observers.Subscriber
import sttp.capabilities.monix.MonixStreams
import sttp.client3._
import sttp.client3.httpclient.monix.HttpClientMonixBackend

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Promise}
//import sttp.client3.impl.monix.FetchMonixBackend
import sttp.client3.impl.monix.MonixServerSentEvents
import sttp.model._

import scala.concurrent.duration.DurationInt

println("Starting client (unsafe)...")
WorkingStream.stream.foreach(x => println(s"Got -> $x"))(monix.execution.Scheduler.global)

// prevents the app from being closed
Await.result(Promise().future, Duration.Inf)

object WorkingStream {
//  implicit val backend: SttpBackend[Task, MonixStreams] = FetchMonixBackend()
  implicit val backend: SttpBackend[Task, MonixStreams] =
    Await.result(HttpClientMonixBackend().runToFuture(monix.execution.Scheduler.global), 10.seconds)

  private val ServerAPI = sttp.model.Uri
    .parse("http://localhost:8080")
    .getOrElse(throw new RuntimeException("Invalid server url"))

  def stream: Observable[String] = {
    val path = ServerAPI.path :+ "seconds"
    val uri = ServerAPI.withPath(path)

    streamParsedEvents(uri)
  }

  private def streamRequest(uri: Uri) = {
    val responseT = basicRequest
      .get(uri)
      .response(asStreamUnsafe(MonixStreams))
      .send(backend)
      .map(_.body.map(MonixServerSentEvents.parse))
      .flatMap {
        case Left(error) => Task.raiseError(new RuntimeException(s"Request to $uri failed: $error"))
        case Right(body) => Task.pure(body)
      }

    Observable
      .fromTask(responseT)
      .flatten
  }

  private def streamParsedEvents(uri: Uri) = {
    streamRequest(uri)
      .map(_.data)
      .flatMap {
        case Some(data) => Observable.pure(data)
        case None =>
          println(s"$uri is streaming an event without data")
          Observable.empty
      }
      // printed as expected
        .guaranteeCase {
          case ExitCase.Error(e) => Task.pure { println(s"$uri stream error: ${e.getMessage}") }
          case ExitCase.Completed =>
            Task.delay {
              println(s"$uri stream completed")
            }
          case ExitCase.Canceled =>
            Task.delay {
              println(s"$uri stream canceled")
            }
        }
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions