Skip to content

fs2 Stream instrumentation #848

Open
@iRevive

Description

@iRevive

Currently, it's hard to instrument the fs2 stream properly.

Perhaps we can follow the trace4cats route and encode the stream as WriterT[Stream[F, *], Span[F], A]:

type TracedStream[F[_]] = WriterT[Stream[F, *], Span[F], A]

implicit class StreamOps[F[_], A](private val s: Stream[F, A]) {
  def traced(name: String)(implicit A: Applicative[F], T: Tracer[F]): TracedStream[F, A] =
    Stream.bracket(Tracer[F].span(name).startUnmanaged)(_.end).flatMap { span =>
      s.evalMapChunk(a => Applicative[F].pure((span, a)))
    }
}

implicit class TracedStreamOps[F[_]](private val s: TracedStream[F, A]] {
  def evalMapTraced(name: String, attributes: Attributes): TracedStream[F, A] = ...
}

And a bunch of overloaded syntax methods.


But maybe there are better alternatives?

Metadata

Metadata

Assignees

No one assigned

    Labels

    tracingImprovements to tracing module

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions