Metrics Publisher Support #1241
Replies: 1 comment 10 replies
-
Hi @RodrasSilva, excellent question! The telemetry and metrics interfaces for the AWS SDK for Kotlin are slightly different from the AWS SDK for Java but it still should be possible to implement your S3 publisher adaptor. The key is providing your own implementation of a ProvidersAs an example, here's an implementation of a telemetry provider which has a custom meter provider and nothing else: class MyTelemetryProvider : TelemetryProvider {
override val contextManager = ContextManager.None
override val loggerProvider = LoggerProvider.None
override val tracerProvider = TracerProvider.None
override val meterProvider = MyMeterProvider()
}
class MyMeterProvider : MeterProvider {
override fun getOrCreateMeter(scope: String) = MyMeter(scope)
}
class MyMeter : Meter {
…
} MetersThe
Your code for a class MyMeter : Meter {
override fun createDoubleHistogram(
name: String,
units: String?,
description: String?,
) = MyDoubleHistogram(name, units, description)
// Override the other Meter members with no-op implementations
override fun createAsyncUpDownCounter(…) = NoOpAsyncMeasurementHandle
override fun createUpDownCounter(…) = NoOpUpDownCounter
override fun createMonotonicCounter(…) = NoOpMonotonicCounter
override fun createLongHistogram(…) = NoOpLongHistogram
override fun createLongGauge(…) = NoOpAsyncMeasurementHandle
override fun createDoubleGauge(…) = NoOpAsyncMeasurementHandle
}
class MyDoubleHistogram(private val name: String, private val units: String?) : DoubleHistogram {
override fun record(value: Double, attributes: Attributes, context: Context?) {
// Handle the measurement
}
} Asynchronous batchingThe data class Measurement(val name: String, val value: Double, val attributes: Attributes)
class MyS3Publisher(private val s3: S3Client, private val coroutineScope: CoroutineScope) {
private val queue = LinkedBlockingQueue<Measurement>()
init {
// Launch a background coroutine to publish metrics every 5 seconds
coroutineScope.launch {
while (isActive) {
delay(5.seconds)
publish()
}
}
}
fun put(measurement: Measurement) {
queue.add(measurement)
}
private suspend fun publish() {
val metrics = mutableListOf<Measurement>()
.also { queue.drainTo(it) }
.joinToString(separator = "\n")
s3.putObject {
bucket = "foo"
key = "bar"
body = ByteStream.fromString(metrics)
}
}
} Wiring it all togetherNow that we have a publisher that can asynchronously batch requests, we just need to modify the providers and meters to accept an instance: class MyTelemetryProvider(publisher: MyS3Publisher) : TelemetryProvider {
override val meterProvider = MyMeterProvider(publisher)
// other members omitted
}
class MyMeterProvider(private val publisher: MyS3Publisher) : MeterProvider {
override fun getOrCreateMeter(scope: String) = MyMeter(publisher)
}
class MyMeter(private val publisher: MyS3Publisher) : Meter {
override fun createDoubleHistogram(name: String, units: String?, description: String?) =
MyDoubleHistogram(publisher, name, units)
// other members omitted
}
class MyDoubleHistogram(
private val publisher: MyS3Publisher,
private val name: String,
private val units: String?,
) : DoubleHistogram {
override fun record(value: Double, attributes: Attributes, context: Context?) {
publisher.put(Measurement(name, units, value, attributes))
}
} Final notesHopefully these examples will be enough to get you started. There are a few things to note about this post though:
Let us know if this sufficiently explains metrics publishing support or if there's anything that could be clarified! Full code from the above examplesThis is the full code from the examples above plus no-op implementations for measurements which were unused in this example: @OptIn(ExperimentalApi::class)
class MyTelemetryProvider(publisher: MyS3Publisher) : TelemetryProvider {
override val contextManager = ContextManager.None
override val loggerProvider = LoggerProvider.None
override val tracerProvider = TracerProvider.None
override val meterProvider = MyMeterProvider(publisher)
}
class MyMeterProvider(private val publisher: MyS3Publisher) : MeterProvider {
override fun getOrCreateMeter(scope: String) = MyMeter(publisher)
}
class MyMeter(private val publisher: MyS3Publisher) : Meter {
override fun createDoubleHistogram(name: String, units: String?, description: String?) =
MyDoubleHistogram(publisher, name, units)
override fun createAsyncUpDownCounter(
name: String,
callback: LongUpDownCounterCallback,
units: String?,
description: String?
): AsyncMeasurementHandle = NoOpMeasurementHandle
override fun createUpDownCounter(name: String, units: String?, description: String?): UpDownCounter =
NoOpUpDownCounter
override fun createMonotonicCounter(name: String, units: String?, description: String?): MonotonicCounter =
NoOpMonotonicCounter
override fun createLongHistogram(name: String, units: String?, description: String?): LongHistogram =
NoOpLongHistogram
override fun createLongGauge(
name: String,
callback: LongGaugeCallback,
units: String?,
description: String?
) = NoOpMeasurementHandle
override fun createDoubleGauge(
name: String,
callback: DoubleGaugeCallback,
units: String?,
description: String?
) = NoOpMeasurementHandle
}
class MyDoubleHistogram(
private val publisher: MyS3Publisher,
private val name: String,
private val units: String?,
) : DoubleHistogram {
override fun record(value: Double, attributes: Attributes, context: Context?) {
publisher.put(Measurement(name, units, value, attributes))
}
}
object NoOpMeasurementHandle : AsyncMeasurementHandle {
override fun stop() = Unit
}
object NoOpLongHistogram : LongHistogram {
override fun record(value: Long, attributes: Attributes, context: Context?) = Unit
}
object NoOpMonotonicCounter : MonotonicCounter {
override fun add(value: Long, attributes: Attributes, context: Context?) = Unit
}
object NoOpUpDownCounter : UpDownCounter {
override fun add(value: Long, attributes: Attributes, context: Context?) = Unit
}
data class Measurement(val name: String, val units: String?, val value: Double, val attributes: Attributes)
class MyS3Publisher(private val s3: S3Client, private val coroutineScope: CoroutineScope) {
private val queue = LinkedBlockingQueue<Measurement>()
init {
coroutineScope.launch {
while (isActive) {
delay(5.seconds)
publish()
}
}
}
fun put(measurement: Measurement) {
queue.add(measurement)
}
private suspend fun publish() {
val metrics = mutableListOf<Measurement>()
.also { queue.drainTo(it) }
.joinToString(separator = "\n")
s3.putObject {
bucket = "foo"
key = "bar"
body = ByteStream.fromString(metrics)
}
}
} |
Beta Was this translation helpful? Give feedback.
-
Hello.
I'm migrating my application from the Java AWS SDK to the Kotlin AWS SDK.
When I was building the S3Client using the Java SDK, I was able to provide a Metrics publisher where I could build my logic to transform the SDK-provided metrics into my own:
How am I able to replicate this behavior using the Kotlin SDK and the telemetry provider?
Thanks in advance.
Beta Was this translation helpful? Give feedback.
All reactions