Skip to content

Commit

Permalink
[finagle-core] add client offload filter apply stats
Browse files Browse the repository at this point in the history
To be able identify top offload pool consumers

JIRA Issues: STOR-8861

Differential Revision: https://phabricator.twitter.biz/D1188216
  • Loading branch information
Anton Ivanov authored and jenkins committed Dec 13, 2024
1 parent e4a911e commit e625ffb
Showing 1 changed file with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import com.twitter.finagle.offload.OffloadFilterAdmissionControl
import com.twitter.finagle.offload.OffloadFuturePool
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle._
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.stats.HistogramFormat
import com.twitter.finagle.stats.MetricBuilder
import com.twitter.finagle.stats.MetricBuilder.HistogramType
import com.twitter.finagle.stats.MetricUsageHint
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util._
import java.util.concurrent.ExecutorService
import scala.runtime.NonLocalReturnControl
Expand Down Expand Up @@ -59,7 +65,18 @@ object OffloadFilter {
private[finagle] def server[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new ServerModule[Req, Rep]

final class Client[Req, Rep](pool: FuturePool) extends SimpleFilter[Req, Rep] {
final class Client[Req, Rep](pool: FuturePool, statsReceiver: StatsReceiver)
extends SimpleFilter[Req, Rep] {

def this(pool: FuturePool) = this(pool, DefaultStatsReceiver.scope("client_offload_filter"))

private[this] val applyTimeNs = statsReceiver.stat(
MetricBuilder(metricType = HistogramType)
.withHistogramFormat(HistogramFormat.ShortSummary)
.withPercentiles(0.99, 0.999, 0.9999)
.withMetricUsageHints(Set(MetricUsageHint.HighContention))
.withName("apply_time_ns")
)

def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
// What we're trying to achieve is to ensure all continuations spawn out of a future returned
Expand All @@ -83,7 +100,11 @@ object OffloadFilter {
val response = service(request)
val shifted = Promise.interrupts[Rep](response)
response.respond { t =>
pool(shifted.update(t))
pool {
val startNs = System.nanoTime()
shifted.update(t)
applyTimeNs.add(System.nanoTime() - startNs)
}

val tracing = Trace()
if (tracing.isActivelyTracing) {
Expand Down Expand Up @@ -178,15 +199,17 @@ object OffloadFilter {
}

private final class ClientModule[Req, Rep]
extends Stack.Module1[Param, ServiceFactory[Req, Rep]] {
extends Stack.Module2[param.Stats, Param, ServiceFactory[Req, Rep]] {

def make(
statsParam: param.Stats,
p: Param,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
p.pool match {
case Some(pool) =>
new Client(pool).andThen(next)
val param.Stats(statsReceiver) = statsParam
new Client(pool, statsReceiver.scope("offload_filter")).andThen(next)
case None => next
}
}
Expand Down

0 comments on commit e625ffb

Please sign in to comment.