Skip to content

Commit

Permalink
refactored TracingConnection to use common implementation. Added supp…
Browse files Browse the repository at this point in the history
…ort for AdbcConnection11 and AdbcStatement11
  • Loading branch information
birschick-bq committed Nov 14, 2024
1 parent a379b23 commit c018d39
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 108 deletions.
121 changes: 13 additions & 108 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Tracing
{
Expand All @@ -31,131 +27,40 @@ namespace Apache.Arrow.Adbc.Tracing
/// </summary>
public abstract class TracingConnection : AdbcConnection, ITracingObject
{
private bool _disposed = false;
internal const string ProductVersionDefault = "1.0.0";
private static readonly string s_activitySourceName = Assembly.GetExecutingAssembly().GetName().Name!;
private static readonly string s_assemblyVersion = GetProductVersion();
private static readonly ConcurrentDictionary<string, ActivityListener> s_listeners = new();
private readonly ConcurrentQueue<Activity> _activityQueue = new();
private readonly IReadOnlyDictionary<string, string>? _options;
private DirectoryInfo? _traceDirectory;
private bool _disposed;

protected TracingConnection(IReadOnlyDictionary<string, string>? options = default)
protected TracingConnection(IReadOnlyDictionary<string, string>? properties)
{
_options = options ?? new Dictionary<string, string>();
EnsureTracing();
TracingConnectionImpl = new TracingConnectionImpl(properties);
}

protected TracingConnection(bool isTracingEnabled, string traceLocation, int traceMaxFileSizeKb, int traceMaxFiles)
{
var options = new Dictionary<string, string>();
options[TracingOptions.Connection.Trace] = isTracingEnabled.ToString();
options[TracingOptions.Connection.TraceLocation] = traceLocation;
options[TracingOptions.Connection.TraceFileMaxSizeKb] = traceMaxFileSizeKb.ToString();
options[TracingOptions.Connection.TraceFileMaxFiles] = traceMaxFiles.ToString();
_options = options;
EnsureTracing();
TracingConnectionImpl = new TracingConnectionImpl(isTracingEnabled, traceLocation, traceMaxFileSizeKb, traceMaxFiles);
}

public ActivitySource? ActivitySource { get; private set; }
private TracingConnectionImpl TracingConnectionImpl { get; }

protected static string GetProductVersion()
{
FileVersionInfo fileVersionInfo = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location);
return fileVersionInfo.ProductVersion ?? ProductVersionDefault;
}

private void EnsureTracing()
{
if (true || _options.TryGetValue(TracingOptions.Connection.Trace, out string? traceOption) && bool.TryParse(traceOption, out bool traceEnabled))
{

// TODO: Handle exceptions
if (_options?.TryGetValue(TracingOptions.Connection.TraceLocation, out string? traceLocation) != true || !Directory.Exists(traceLocation))
{
string? traceLocationDefault = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
_traceDirectory = new DirectoryInfo(traceLocationDefault);
}
else
{
// TODO: If not exist, try to create
_traceDirectory = new DirectoryInfo(traceLocation);
}
// TODO: Check if folder is writable


// TODO: Determine the best handling of listener lifetimes.
// Key of listeners collection should be ouput file location
ActivityListener listener = s_listeners.GetOrAdd(s_activitySourceName + "." + _traceDirectory.FullName, (_) => new()
{
ShouldListenTo = (source) => source.Name == s_activitySourceName,
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = OnActivityStarted,
ActivityStopped = OnActivityStopped,
SampleUsingParentId = (ref ActivityCreationOptions<string> options) => ActivitySamplingResult.AllDataAndRecorded,
});
// This is a singleton add, if the lister is the same.
ActivitySource.AddActivityListener(listener);
// THis is an new instance and needs to be disposed later
ActivitySource = new(s_activitySourceName, s_assemblyVersion);
}
}
public ActivitySource? ActivitySource => TracingConnectionImpl.ActivitySource;

private void OnActivityStarted(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("started")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

private void OnActivityStopped(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("stopped")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

// TODO: Encapsulate this separately
private Task DequeueAndWrite(string state)
{
if (_activityQueue.TryDequeue(out Activity? activity))
{
if (activity != null)
{
try
{
string json = JsonSerializer.Serialize(new { State = state, Activity = activity });
Console.WriteLine(json);
}
catch (NotSupportedException ex)
{
Console.WriteLine(ex.Message);
}
}
}
public abstract string TracingBaseName { get; }

return Task.CompletedTask;
}
public Activity? StartActivity(string methodName) => TracingConnectionImpl.StartActivity(methodName);

public Activity? StartActivity(string methodName)
{
return StartActivity(ActivitySource, TracingBaseName, methodName);
}
public static Activity? StartActivity(ActivitySource? activitySource, string baseName, string methodName) =>
TracingConnectionImpl.StartActivity(activitySource, baseName, methodName);

public abstract string TracingBaseName { get; }
public static string ProductVersionDefault => TracingConnectionImpl.ProductVersionDefault;

protected internal static Activity? StartActivity(ActivitySource? activitySource, string typeName, string methodName) =>
activitySource?.StartActivity(typeName + "." + methodName);
public static string GetProductVersion() => TracingConnectionImpl.GetProductVersion();

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
ActivitySource?.Dispose();
TracingConnectionImpl.Dispose();
}
_disposed = true;
}
Expand Down
67 changes: 67 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection11.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;
using System.Diagnostics;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides an <see cref="AdbcConnection11"> which can instrument tracing activity.
/// </summary>
public abstract class TracingConnection11 : AdbcConnection11, ITracingObject
{
private bool _disposed;

protected TracingConnection11(IReadOnlyDictionary<string, string>? properties)
{
TracingConnectionImpl = new TracingConnectionImpl(properties);
}

protected TracingConnection11(bool isTracingEnabled, string traceLocation, int traceMaxFileSizeKb, int traceMaxFiles)
{
TracingConnectionImpl = new TracingConnectionImpl(isTracingEnabled, traceLocation, traceMaxFileSizeKb, traceMaxFiles);
}

private TracingConnectionImpl TracingConnectionImpl { get; }

public ActivitySource? ActivitySource => TracingConnectionImpl.ActivitySource;

public abstract string TracingBaseName { get; }

public Activity? StartActivity(string methodName) => TracingConnectionImpl.StartActivity(methodName);

public static Activity? StartActivity(ActivitySource? activitySource, string baseName, string methodName) =>
TracingConnectionImpl.StartActivity(activitySource, baseName, methodName);

public static string ProductVersionDefault => TracingConnectionImpl.ProductVersionDefault;

public static string GetProductVersion() => TracingConnectionImpl.GetProductVersion();

protected override void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
TracingConnectionImpl.Dispose();
}
_disposed = true;
}
}
}
}
170 changes: 170 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnectionImpl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides an <see cref="AdbcConnection"> which can instrument tracing activity.
/// </summary>
internal sealed class TracingConnectionImpl : ITracingObject, IDisposable
{
private bool _disposed = false;
internal const string ProductVersionDefault = "1.0.0";
private static readonly string s_activitySourceName = Assembly.GetExecutingAssembly().GetName().Name!;
private static readonly string s_assemblyVersion = GetProductVersion();
private static readonly ConcurrentDictionary<string, ActivityListener> s_listeners = new();
private readonly ConcurrentQueue<Activity> _activityQueue = new();
private readonly IReadOnlyDictionary<string, string>? _options;
private DirectoryInfo? _traceDirectory;

internal TracingConnectionImpl(IReadOnlyDictionary<string, string>? options = default)
{
_options = options ?? new Dictionary<string, string>();
EnsureTracing();
}

internal TracingConnectionImpl(bool isTracingEnabled, string traceLocation, int traceMaxFileSizeKb, int traceMaxFiles)
{
var options = new Dictionary<string, string>();
options[TracingOptions.Connection.Trace] = isTracingEnabled.ToString();
options[TracingOptions.Connection.TraceLocation] = traceLocation;
options[TracingOptions.Connection.TraceFileMaxSizeKb] = traceMaxFileSizeKb.ToString();
options[TracingOptions.Connection.TraceFileMaxFiles] = traceMaxFiles.ToString();
_options = options;
EnsureTracing();
}

public ActivitySource? ActivitySource { get; private set; }

internal static string GetProductVersion()
{
FileVersionInfo fileVersionInfo = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location);
return fileVersionInfo.ProductVersion ?? ProductVersionDefault;
}

private void EnsureTracing()
{
if (true || _options.TryGetValue(TracingOptions.Connection.Trace, out string? traceOption) && bool.TryParse(traceOption, out bool traceEnabled))
{

// TODO: Handle exceptions
if (_options?.TryGetValue(TracingOptions.Connection.TraceLocation, out string? traceLocation) != true || !Directory.Exists(traceLocation))
{
string? traceLocationDefault = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
_traceDirectory = new DirectoryInfo(traceLocationDefault);
}
else
{
// TODO: If not exist, try to create
_traceDirectory = new DirectoryInfo(traceLocation);
}
// TODO: Check if folder is writable


// TODO: Determine the best handling of listener lifetimes.
// Key of listeners collection should be ouput file location
ActivityListener listener = s_listeners.GetOrAdd(s_activitySourceName + "." + _traceDirectory.FullName, (_) => new()
{
ShouldListenTo = (source) => source.Name == s_activitySourceName,
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = OnActivityStarted,
ActivityStopped = OnActivityStopped,
SampleUsingParentId = (ref ActivityCreationOptions<string> options) => ActivitySamplingResult.AllDataAndRecorded,
});
// This is a singleton add, if the lister is the same.
ActivitySource.AddActivityListener(listener);
// THis is an new instance and needs to be disposed later
ActivitySource = new(s_activitySourceName, s_assemblyVersion);
}
}

private void OnActivityStarted(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("started")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

private void OnActivityStopped(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("stopped")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

// TODO: Encapsulate this separately
private Task DequeueAndWrite(string state)
{
if (_activityQueue.TryDequeue(out Activity? activity))
{
if (activity != null)
{
try
{
string json = JsonSerializer.Serialize(new { State = state, Activity = activity });
Console.WriteLine(json);
}
catch (NotSupportedException ex)
{
Console.WriteLine(ex.Message);
}
}
}

return Task.CompletedTask;
}

public Activity? StartActivity(string methodName)
{
return StartActivity(ActivitySource, TracingBaseName, methodName);
}

public string TracingBaseName => throw new NotImplementedException();

internal static Activity? StartActivity(ActivitySource? activitySource, string baseName, string methodName) =>
activitySource?.StartActivity(baseName + "." + methodName);

internal void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
ActivitySource?.Dispose();
}
_disposed = true;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}
Loading

0 comments on commit c018d39

Please sign in to comment.