Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
db1b38a
feat: Add support for transactional data stores.
kinyoklion Dec 1, 2025
1757212
chore: Add changeset sorting.
kinyoklion Dec 2, 2025
5c499c9
Remove version collapsing.
kinyoklion Dec 3, 2025
d73c5f9
Change data to property
kinyoklion Dec 3, 2025
9ffcf50
chore: Add support for transactional data source updates.
kinyoklion Dec 2, 2025
edde5c3
Merge branch 'rlamb/add-changeset-sorting' into rlamb/sdk-1583/transa…
kinyoklion Dec 3, 2025
8805426
Basic transactional apply routing.
kinyoklion Dec 3, 2025
9c37607
Support non-transactional data stores.
kinyoklion Dec 3, 2025
5d16f31
Organize code by region.
kinyoklion Dec 3, 2025
94184a3
Only find environment ID once per connection.
kinyoklion Dec 3, 2025
94cdabf
chore: Add FDv2 polling data source.
kinyoklion Dec 4, 2025
8c14faa
Error handling improvements.
kinyoklion Dec 4, 2025
5b2e4d3
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
90e87a0
Error handling improvements.
kinyoklion Dec 4, 2025
9655e3a
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
5844a66
Refactor application to transactional versus non-transactional stores.
kinyoklion Dec 4, 2025
891ad0a
Sort and add comments.
kinyoklion Dec 4, 2025
ff7c34d
Merge branch 'rlamb/sdk-1583/transactional-data-source-updates' into …
kinyoklion Dec 4, 2025
aee3377
Merge branch 'main' into rlamb/sdk-1584/fdv2-polling-data-source
kinyoklion Dec 5, 2025
30bec0e
More robust JSON parsing.
kinyoklion Dec 8, 2025
6c5765a
Better 304 handling.
kinyoklion Dec 8, 2025
69b18d1
More JSON error handling.
kinyoklion Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private async Task<BytesWithHeaders> GetAsync(Uri path)
_log.Debug("Get all flags returned 304: not modified");
return null;
}
//We ensure the status code after checking for 304, because 304 isn't considered success
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment because it was blatantly wrong. A 304 is a success, and this code doesn't even run when there is a 304 because we return above. (I looked back through the history and it has been incorrect since addition 9 years ago.)


if (!response.IsSuccessStatusCode)
{
throw new UnsuccessfulResponseException((int)response.StatusCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
using System;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using LaunchDarkly.Logging;
using LaunchDarkly.Sdk.Internal;
using LaunchDarkly.Sdk.Internal.Concurrent;
using LaunchDarkly.Sdk.Internal.Http;
using LaunchDarkly.Sdk.Server.Interfaces;
using LaunchDarkly.Sdk.Server.Subsystems;

namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
{
internal sealed class FDv2PollingDataSource : IDataSource
{
internal delegate Selector SelectorSource();

private readonly IFDv2PollingRequestor _requestor;
private readonly IDataSourceUpdates _dataSourceUpdates;
private readonly TaskExecutor _taskExecutor;
private readonly TimeSpan _pollInterval;
private readonly AtomicBoolean _initialized = new AtomicBoolean(false);
private readonly TaskCompletionSource<bool> _initTask;
private readonly Logger _log;
private readonly FDv2ProtocolHandler _protocolHandler = new FDv2ProtocolHandler();
private readonly object _protocolLock = new object();
private readonly SelectorSource _selectorSource;

private CancellationTokenSource _canceler;
private string _environmentId;

internal FDv2PollingDataSource(
LdClientContext context,
IDataSourceUpdates dataSourceUpdates,
IFDv2PollingRequestor requestor,
TimeSpan pollInterval,
SelectorSource selectorSource
)
{
_requestor = requestor;
_dataSourceUpdates = dataSourceUpdates;
_taskExecutor = context.TaskExecutor;
_pollInterval = pollInterval;
_selectorSource = selectorSource;
_initTask = new TaskCompletionSource<bool>();
_log = context.Logger.SubLogger(LogNames.FDv2DataSourceSubLog);

_log.Debug("Created LaunchDarkly FDv2 polling data source");
}

public bool Initialized => _initialized.Get();

public Task<bool> Start()
{
lock (this)
{
// If we have a canceler, then the source has already been started.
if (_canceler != null) return _initTask.Task;

_log.Info("Starting LaunchDarkly FDv2 polling with interval: {0} milliseconds",
_pollInterval.TotalMilliseconds);
_canceler = _taskExecutor.StartRepeatingTask(TimeSpan.Zero,
_pollInterval, UpdateTaskAsync);
}

return _initTask.Task;
}

private async Task UpdateTaskAsync()
{
_log.Debug("Polling LaunchDarkly for feature flag updates");
try
{
var selector = _selectorSource();
var response = await _requestor.PollingRequestAsync(selector);

if (response == null)
{
// This means we got a cached response (304), so we are valid.
// We could have previously been interrupted, so we need to return to valid even
// if there are no changes.
_dataSourceUpdates.UpdateStatus(DataSourceState.Valid, null);
return;
}

if (response.Value.Headers != null)
{
_environmentId = response.Value.Headers.FirstOrDefault(item =>
item.Key.ToLower() == HeaderConstants.EnvironmentId).Value
?.FirstOrDefault();
}

ProcessPollingResponse(response.Value);
}
catch (UnsuccessfulResponseException ex)
{
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);

if (HttpErrors.IsRecoverable(ex.StatusCode))
{
_log.Warn(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", "will retry"));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
}
else
{
_log.Error(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", ""));
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
try
{
_initTask.SetResult(true);
}
catch (InvalidOperationException)
{
// the task was already set - nothing more to do
}

((IDisposable)this).Dispose();
}
}
catch (JsonException ex)
{
_log.Error("Polling request received malformed data: {0}", LogValues.ExceptionSummary(ex));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
new DataSourceStatus.ErrorInfo
{
Kind = DataSourceStatus.ErrorKind.InvalidData,
Time = DateTime.Now
});
}
catch (Exception ex)
{
var realEx = (ex is AggregateException ae) ? ae.Flatten() : ex;
_log.Warn("Polling for feature flag updates failed: {0}", LogValues.ExceptionSummary(ex));
_log.Debug(LogValues.ExceptionTrace(ex));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
DataSourceStatus.ErrorInfo.FromException(realEx));
}
}

private void ProcessPollingResponse(FDv2PollingResponse response)
{
lock (_protocolLock)
{
_protocolHandler.Reset();

foreach (var evt in response.Events)
{
var action = _protocolHandler.HandleEvent(evt);
ProcessProtocolAction(action);
}
}
}

private void ProcessProtocolAction(IFDv2ProtocolAction action)
{
switch (action)
{
case FDv2ActionChangeset changesetAction:
ProcessChangeSet(changesetAction.Changeset);
break;
case FDv2ActionError errorAction:
_log.Error("FDv2 error event: {0} - {1}", errorAction.Id, errorAction.Reason);
break;
case FDv2ActionGoodbye goodbyeAction:
_log.Info("FDv2 server disconnecting: {0}", goodbyeAction.Reason);
break;
case FDv2ActionInternalError internalErrorAction:
_log.Error("FDv2 protocol error ({0}): {1}", internalErrorAction.ErrorType,
internalErrorAction.Message);
break;
case FDv2ActionNone _:
// No action needed
break;
default:
// Represents an implementation error. Actions expanded without the handling
// being expanded.
_log.Error("Unhandled FDv2 Protocol Action.");
break;
}
}

private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet)
{
if (!(_dataSourceUpdates is ITransactionalDataSourceUpdates transactionalDataSourceUpdates))
throw new InvalidOperationException("Cannot apply updates to non-transactional data source");

var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId);

// If the update fails, then we wait until the next poll and try again.
// This is different from a streaming data source, which will need to re-start to get an initial
// payload.
if (!transactionalDataSourceUpdates.Apply(dataStoreChangeSet)) return;

// Only mark as initialized after successfully applying a changeset
if (_initialized.GetAndSet(true)) return;
_initTask.SetResult(true);
_log.Info("First polling request successful");
}

void IDisposable.Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (!disposing) return;

Shutdown();
}

private void Shutdown()
{
_canceler?.Cancel();
_requestor.Dispose();
}
}
}
156 changes: 156 additions & 0 deletions pkgs/sdk/server/src/Internal/FDv2DataSources/FDv2PollingRequestor.cs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be FDv2Requestor? The only logic inside it that is specific to polling is the logs. I know off the top of my head iOS and Node have Requestor classes that are separable from the polling itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload it produces is not agnostic, so I am not sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always change it though, as it is internal.

Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using LaunchDarkly.Logging;
using LaunchDarkly.Sdk.Internal;
using LaunchDarkly.Sdk.Internal.Http;
using LaunchDarkly.Sdk.Server.Internal.FDv2Payloads;
using LaunchDarkly.Sdk.Server.Internal.Model;
using LaunchDarkly.Sdk.Server.Subsystems;

namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
{
internal sealed class FDv2PollingRequestor : IFDv2PollingRequestor
{
private const string VersionQueryParam = "version";
private const string StateQueryParam = "state";

private readonly Uri _baseUri;
private readonly HttpClient _httpClient;
private readonly HttpProperties _httpProperties;
private readonly TimeSpan _connectTimeout;
private readonly Logger _log;
private readonly JsonSerializerOptions _jsonOptions;
private readonly Dictionary<Uri, EntityTagHeaderValue> _etags = new Dictionary<Uri, EntityTagHeaderValue>();

internal FDv2PollingRequestor(LdClientContext context, Uri baseUri)
{
_baseUri = baseUri;
_httpProperties = context.Http.HttpProperties;
_httpClient = context.Http.NewHttpClient();
_connectTimeout = context.Http.ConnectTimeout;
_log = context.Logger.SubLogger(LogNames.FDv2DataSourceSubLog);

// Set up JSON deserialization options
_jsonOptions = new JsonSerializerOptions();
_jsonOptions.Converters.Add(ServerIntentConverter.Instance);
_jsonOptions.Converters.Add(PutObjectConverter.Instance);
_jsonOptions.Converters.Add(DeleteObjectConverter.Instance);
_jsonOptions.Converters.Add(PayloadTransferredConverter.Instance);
_jsonOptions.Converters.Add(ErrorConverter.Instance);
_jsonOptions.Converters.Add(GoodbyeConverter.Instance);
_jsonOptions.Converters.Add(FDv2PollEventConverter.Instance);
_jsonOptions.Converters.Add(FeatureFlagSerialization.Instance);
_jsonOptions.Converters.Add(SegmentSerialization.Instance);
}

public async Task<FDv2PollingResponse?> PollingRequestAsync(Selector selector)
{
var uri = _baseUri.AddPath(StandardEndpoints.FDv2PollingRequestPath);

// Add selector query parameters
var uriBuilder = new UriBuilder(uri);
var query = new List<string>();

if (selector.Version > 0)
{
query.Add($"{VersionQueryParam}={selector.Version}");
}

if (!string.IsNullOrEmpty(selector.State))
{
query.Add($"{StateQueryParam}={Uri.EscapeDataString(selector.State)}");
}

if (query.Count > 0)
{
uriBuilder.Query = string.Join("&", query);
}

var requestUri = uriBuilder.Uri;

_log.Debug("Making FDv2 polling request to {0}", requestUri);

var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
// ReSharper disable once PossiblyImpureMethodCallOnReadonlyVariable
// This method adds the headers to the request, versus adding headers to the properties. Resharper
// analysis incorrectly thinks it is an impure function.
_httpProperties.AddHeaders(request);

lock (_etags)
{
if (_etags.TryGetValue(requestUri, out var etag))
{
request.Headers.IfNoneMatch.Add(etag);
}
}

using (var cts = new CancellationTokenSource(_connectTimeout))
{
try
{
using (var response = await _httpClient.SendAsync(request, cts.Token).ConfigureAwait(false))
{
if (response.StatusCode == HttpStatusCode.NotModified)
{
_log.Debug("FDv2 polling request returned 304: not modified");
return null;
}

if (!response.IsSuccessStatusCode)
{
throw new UnsuccessfulResponseException((int)response.StatusCode);
}

lock (_etags)
{
if (response.Headers.ETag != null)
{
_etags[requestUri] = response.Headers.ETag;
}
else
{
_etags.Remove(requestUri);
}
}

var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);

_log.Debug("Received FDv2 polling response");

// Parse the response which contains an "events" array
var events = FDv2Event.DeserializeEventsArray(content, _jsonOptions);

var headers = response.Headers
.Select(h => new KeyValuePair<string, IEnumerable<string>>(h.Key, h.Value))
.ToList();

return new FDv2PollingResponse(events, headers);
}
}
catch (TaskCanceledException tce)
{
if (tce.CancellationToken == cts.Token)
{
// Indicates the task was canceled by something other than a request timeout.
throw;
}

throw new TimeoutException("FDv2 polling request with URL: " + requestUri.AbsoluteUri +
" timed out after: " + _connectTimeout);
}
}
}

public void Dispose()
{
_httpClient?.Dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text.Json;
using System.Threading.Tasks;
using LaunchDarkly.EventSource;
using LaunchDarkly.Logging;
Expand Down
Loading
Loading