Skip to content

Commit

Permalink
Data Streams Monitoring support in Kinesis (#6428)
Browse files Browse the repository at this point in the history
## Summary of changes

* Added Data Streams Monitoring checkpoints to existing
autoinstrumentation of Kinesis PutRecord[s][Async]
* Added autoinstrumentation to Kinesis GetRecords[Async]

## Reason for change

Data Streams Monitoring was not supported for Kinesis, this adds that
functionality.

## Implementation details

The existing producer instrumentation only works when StreamName is
specified, not when StreamARN is specified. These changes leave that
behavior unchanged at present.

The new consumer instrumentation is able to infer a StreamName from the
[mandatory] StreamARN so that the producer and consumer can use the same
naming system. The reverse would not be possible, and changing producer
instrumentation to use ARNs instead would be a potentially-breaking
change to existing users of the functionality so is outside the scope of
this feature addition.
  • Loading branch information
roisinlh authored Dec 17, 2024
1 parent eed7b60 commit 4ed8511
Show file tree
Hide file tree
Showing 23 changed files with 1,608 additions and 919 deletions.
49 changes: 49 additions & 0 deletions tracer/build/supported_calltargets.g.json
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,55 @@
"IsAdoNetIntegration": false,
"InstrumentationCategory": 1
},
{
"IntegrationName": "AwsKinesis",
"AssemblyName": "AWSSDK.Kinesis",
"TargetTypeName": "Amazon.Kinesis.AmazonKinesisClient",
"TargetMethodName": "GetRecords",
"TargetReturnType": "Amazon.Kinesis.Model.GetRecordsResponse",
"TargetParameterTypes": [
"Amazon.Kinesis.Model.GetRecordsRequest"
],
"MinimumVersion": {
"Item1": 3,
"Item2": 0,
"Item3": 0
},
"MaximumVersion": {
"Item1": 3,
"Item2": 65535,
"Item3": 65535
},
"InstrumentationTypeName": "Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis.GetRecordsIntegration",
"IntegrationKind": 0,
"IsAdoNetIntegration": false,
"InstrumentationCategory": 1
},
{
"IntegrationName": "AwsKinesis",
"AssemblyName": "AWSSDK.Kinesis",
"TargetTypeName": "Amazon.Kinesis.AmazonKinesisClient",
"TargetMethodName": "GetRecordsAsync",
"TargetReturnType": "System.Threading.Tasks.Task`1[Amazon.Kinesis.Model.GetRecordsResponse]",
"TargetParameterTypes": [
"Amazon.Kinesis.Model.GetRecordsRequest",
"System.Threading.CancellationToken"
],
"MinimumVersion": {
"Item1": 3,
"Item2": 0,
"Item3": 0
},
"MaximumVersion": {
"Item1": 3,
"Item2": 65535,
"Item3": 65535
},
"InstrumentationTypeName": "Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis.GetRecordsAsyncIntegration",
"IntegrationKind": 0,
"IsAdoNetIntegration": false,
"InstrumentationCategory": 1
},
{
"IntegrationName": "AwsKinesis",
"AssemblyName": "AWSSDK.Kinesis",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ internal static class AwsKinesisCommon
internal const string IntegrationName = nameof(IntegrationId.AwsKinesis);
internal const IntegrationId IntegrationId = Configuration.IntegrationId.AwsKinesis;

public static string? StreamNameFromARN(string? arn)
{
if (string.IsNullOrEmpty(arn))
{
return null;
}

var arnComponents = arn!.Split('/');
if (arnComponents.Length != 2)
{
return null;
}

return arnComponents[1];
}

public static Scope? CreateScope(Tracer tracer, string operation, string spanKind, ISpanContext? parentContext, out AwsKinesisTags? tags)
{
tags = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using Datadog.Trace.DataStreamsMonitoring;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Headers;
using Datadog.Trace.Logging;
using Datadog.Trace.Propagators;
using Datadog.Trace.Vendors.Newtonsoft.Json;
Expand All @@ -21,7 +23,7 @@ internal static class ContextPropagation
private const int MaxKinesisDataSize = 1024 * 1024; // 1MB
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(ContextPropagation));

public static void InjectTraceIntoRecords<TRecordsRequest>(TRecordsRequest request, PropagationContext context)
public static void InjectTraceIntoRecords<TRecordsRequest>(TRecordsRequest request, Scope? scope, string? streamName)
where TRecordsRequest : IPutRecordsRequest
{
// request.Records is not null and has at least one element
Expand All @@ -32,13 +34,28 @@ public static void InjectTraceIntoRecords<TRecordsRequest>(TRecordsRequest reque

if (request.Records[0].DuckCast<IContainsData>() is { } record)
{
InjectTraceIntoData(record, context);
InjectTraceIntoData(record, scope, streamName);
}
}

public static void InjectTraceIntoData<TRecordRequest>(TRecordRequest record, PropagationContext context)
public static void InjectTraceIntoData<TRecordRequest>(TRecordRequest record, Scope? scope, string? streamName)
where TRecordRequest : IContainsData
{
Dictionary<string, object> propagatedContext = new Dictionary<string, object>();
if (scope?.Span.Context != null && !string.IsNullOrEmpty(streamName))
{
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager != null && dataStreamsManager.IsEnabled)
{
var edgeTags = new[] { "direction:out", $"topic:{streamName}", "type:kinesis" };
scope.Span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
var adapter = new KinesisContextAdapter();
dataStreamsManager.InjectPathwayContext(scope.Span.Context.PathwayContext, adapter);
propagatedContext = adapter.GetDictionary();
}
}

var context = new PropagationContext(scope?.Span.Context, Baggage.Current);
if (record.Data is null)
{
return;
Expand All @@ -52,7 +69,6 @@ public static void InjectTraceIntoData<TRecordRequest>(TRecordRequest record, Pr

try
{
var propagatedContext = new Dictionary<string, object>();
SpanContextPropagator.Instance.Inject(context, propagatedContext, default(DictionaryGetterAndSetter));
jsonData[KinesisKey] = propagatedContext;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// <copyright file="GetRecordsAsyncIntegration.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.ComponentModel;
using System.Threading;
using Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Shared;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.DataStreamsMonitoring;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Propagators;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis
{
/// <summary>
/// AWSSDK.Kinesis GetRecordsAsync CallTarget instrumentation
/// </summary>
[InstrumentMethod(
AssemblyName = "AWSSDK.Kinesis",
TypeName = "Amazon.Kinesis.AmazonKinesisClient",
MethodName = "GetRecordsAsync",
ReturnTypeName = "System.Threading.Tasks.Task`1[Amazon.Kinesis.Model.GetRecordsResponse]",
ParameterTypeNames = new[] { "Amazon.Kinesis.Model.GetRecordsRequest", ClrNames.CancellationToken },
MinimumVersion = "3.0.0",
MaximumVersion = "3.*.*",
IntegrationName = AwsKinesisCommon.IntegrationName)]
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public class GetRecordsAsyncIntegration
{
private const string Operation = "GetRecords";

internal static CallTargetState OnMethodBegin<TTarget, TGetRecordsRequest>(TTarget instance, TGetRecordsRequest request, CancellationToken cancellationToken)
where TGetRecordsRequest : IGetRecordsRequest, IDuckType
{
if (request.Instance is null)
{
return CallTargetState.GetDefault();
}

var scope = AwsKinesisCommon.CreateScope(Tracer.Instance, Operation, SpanKinds.Consumer, null, out var tags);

string? streamName = AwsKinesisCommon.StreamNameFromARN(request.StreamARN);
if (tags is not null && streamName is not null)
{
tags.StreamName = streamName;
}

return new CallTargetState(scope, streamName);
}

internal static TResponse OnAsyncMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
where TResponse : IGetRecordsResponse, IDuckType
{
if (response.Instance != null && response.Records is { Count: > 0 } && state is { State: not null, Scope.Span: { } span })
{
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager is { IsEnabled: true })
{
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" };
foreach (var o in response.Records)
{
var record = o.DuckCast<IRecord>();
if (record == null)
{
continue; // should not happen
}

span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
}
}
}

state.Scope.DisposeWithException(exception);
return response;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// <copyright file="GetRecordsIntegration.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.Collections.Generic;
using System.ComponentModel;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.DataStreamsMonitoring;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Propagators;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis
{
/// <summary>
/// AWSSDK.Kinesis GetRecords CallTarget instrumentation
/// </summary>
[InstrumentMethod(
AssemblyName = "AWSSDK.Kinesis",
TypeName = "Amazon.Kinesis.AmazonKinesisClient",
MethodName = "GetRecords",
ReturnTypeName = "Amazon.Kinesis.Model.GetRecordsResponse",
ParameterTypeNames = new[] { "Amazon.Kinesis.Model.GetRecordsRequest" },
MinimumVersion = "3.0.0",
MaximumVersion = "3.*.*",
IntegrationName = AwsKinesisCommon.IntegrationName)]
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public class GetRecordsIntegration
{
private const string Operation = "GetRecords";

/// <summary>
/// OnMethodBegin callback
/// </summary>
/// <typeparam name="TTarget">Type of the target</typeparam>
/// <typeparam name="TGetRecordsRequest">Type of the request object</typeparam>
/// <param name="instance">Instance value, aka `this` of the instrumented method</param>
/// <param name="request">The request for the Kinesis operation</param>
/// <returns>CallTarget state value</returns>
internal static CallTargetState OnMethodBegin<TTarget, TGetRecordsRequest>(TTarget instance, TGetRecordsRequest request)
where TGetRecordsRequest : IGetRecordsRequest, IDuckType
{
if (request.Instance is null)
{
return CallTargetState.GetDefault();
}

var scope = AwsKinesisCommon.CreateScope(Tracer.Instance, Operation, SpanKinds.Producer, null, out var tags);

string? streamName = AwsKinesisCommon.StreamNameFromARN(request.StreamARN);
if (tags is not null && streamName is not null)
{
tags.StreamName = streamName;
}

return new CallTargetState(scope);
}

/// <summary>
/// OnMethodEnd callback
/// </summary>
/// <typeparam name="TTarget">Type of the target</typeparam>
/// <typeparam name="TResponse">Type of the return value</typeparam>
/// <param name="instance">Instance value, aka `this` of the instrumented method.</param>
/// <param name="response">Task of HttpResponse message instance</param>
/// <param name="exception">Exception instance in case the original code threw an exception.</param>
/// <param name="state">Calltarget state value</param>
/// <returns>A response value, in an async scenario will be T of Task of T</returns>
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
where TResponse : IGetRecordsResponse, IDuckType
{
if (response.Instance != null && response.Records is { Count: > 0 } && state is { State: not null, Scope.Span: { } span })
{
var dataStreamsManager = Tracer.Instance.TracerManager.DataStreamsManager;
if (dataStreamsManager is { IsEnabled: true })
{
var edgeTags = new[] { "direction:in", $"topic:{(string)state.State}", "type:kinesis" };
foreach (var o in response.Records)
{
var record = o.DuckCast<IRecord>();
if (record == null)
{
continue; // should not happen
}

span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Consume, edgeTags, payloadSizeBytes: 0, timeInQueueMs: 0);
}
}
}

state.Scope.DisposeWithException(exception);
return new CallTargetReturn<TResponse>(response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// <copyright file="IGetRecordsRequest.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System.Collections;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis
{
/// <summary>
/// GetRecordsRequest interface for duck typing.
/// </summary>
internal interface IGetRecordsRequest
{
string StreamARN { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// <copyright file="IGetRecordsResponse.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System.Collections;
using System.Collections.Generic;
using System.IO;
using Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Shared;
using Datadog.Trace.DuckTyping;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.AWS.Kinesis
{
/// <summary>
/// GetRecordsRequest interface for duck typing.
/// </summary>
internal interface IGetRecordsResponse : IDuckType
{
IList Records { get; } // <IRecord>
}

internal interface IRecord
{
MemoryStream? Data { get; }
}
}
Loading

0 comments on commit 4ed8511

Please sign in to comment.