Skip to content

Commit a50515e

Browse files
authored
chore: Add FDv2 Polling Data Source (#190)
<!-- CURSOR_SUMMARY --> > [!NOTE] > Implements FDv2 polling (requestor + data source) with event-array parsing, ETag/304 handling, status updates, env-id header extraction, and comprehensive tests. > > - **FDv2 Polling**: > - **Data Source**: New `Internal/FDv2DataSources/FDv2PollingDataSource.cs` implementing periodic polling, processing protocol actions, applying transactional changesets, marking initialization, updating `DataSourceStatus`, and handling JSON/HTTP errors. > - **Requestor**: New `Internal/FDv2DataSources/FDv2PollingRequestor.cs` building requests with selector query params, ETag caching, 304 handling, header capture (including `x-ld-envid`), and deserializing polling `events`. > - **Interface/Models**: New `IFDv2PollingRequestor.cs` defining `FDv2PollingResponse` and requestor interface. > - **Event Parsing**: `Internal/FDv2Payloads/FDv2Event.cs` adds `DeserializeEventsArray` and poll-event converter usage for parsing `events` arrays with strict validation. > - **Streaming**: > - Minor imports added in `FDv2StreamingDataSource.cs` (no functional change in this diff). > - **Misc**: > - Minor cleanup in `Internal/DataSources/FeatureRequestor.cs`. > - **Tests**: > - Extensive tests in `test/Internal/FDv2DataSources/FDv2PollingDataSourceTest.cs` covering init, full/partial updates, deletes, 304 behavior, error/status transitions, selector propagation, and env-id extraction. > - Additional tests in `test/Internal/FDv2Payloads/FDv2PayloadsTest.cs` for `DeserializeEventsArray` and valid/invalid payload scenarios. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 69b18d1. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent c210d06 commit a50515e

File tree

8 files changed

+1446
-3
lines changed

8 files changed

+1446
-3
lines changed

pkgs/sdk/server/src/Internal/DataSources/FeatureRequestor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private async Task<BytesWithHeaders> GetAsync(Uri path)
9898
_log.Debug("Get all flags returned 304: not modified");
9999
return null;
100100
}
101-
//We ensure the status code after checking for 304, because 304 isn't considered success
101+
102102
if (!response.IsSuccessStatusCode)
103103
{
104104
throw new UnsuccessfulResponseException((int)response.StatusCode);
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
using System;
2+
using System.Linq;
3+
using System.Text.Json;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using LaunchDarkly.Logging;
7+
using LaunchDarkly.Sdk.Internal;
8+
using LaunchDarkly.Sdk.Internal.Concurrent;
9+
using LaunchDarkly.Sdk.Internal.Http;
10+
using LaunchDarkly.Sdk.Server.Interfaces;
11+
using LaunchDarkly.Sdk.Server.Subsystems;
12+
13+
namespace LaunchDarkly.Sdk.Server.Internal.FDv2DataSources
14+
{
15+
internal sealed class FDv2PollingDataSource : IDataSource
16+
{
17+
internal delegate Selector SelectorSource();
18+
19+
private readonly IFDv2PollingRequestor _requestor;
20+
private readonly IDataSourceUpdates _dataSourceUpdates;
21+
private readonly TaskExecutor _taskExecutor;
22+
private readonly TimeSpan _pollInterval;
23+
private readonly AtomicBoolean _initialized = new AtomicBoolean(false);
24+
private readonly TaskCompletionSource<bool> _initTask;
25+
private readonly Logger _log;
26+
private readonly FDv2ProtocolHandler _protocolHandler = new FDv2ProtocolHandler();
27+
private readonly object _protocolLock = new object();
28+
private readonly SelectorSource _selectorSource;
29+
30+
private CancellationTokenSource _canceler;
31+
private string _environmentId;
32+
33+
internal FDv2PollingDataSource(
34+
LdClientContext context,
35+
IDataSourceUpdates dataSourceUpdates,
36+
IFDv2PollingRequestor requestor,
37+
TimeSpan pollInterval,
38+
SelectorSource selectorSource
39+
)
40+
{
41+
_requestor = requestor;
42+
_dataSourceUpdates = dataSourceUpdates;
43+
_taskExecutor = context.TaskExecutor;
44+
_pollInterval = pollInterval;
45+
_selectorSource = selectorSource;
46+
_initTask = new TaskCompletionSource<bool>();
47+
_log = context.Logger.SubLogger(LogNames.FDv2DataSourceSubLog);
48+
49+
_log.Debug("Created LaunchDarkly FDv2 polling data source");
50+
}
51+
52+
public bool Initialized => _initialized.Get();
53+
54+
public Task<bool> Start()
55+
{
56+
lock (this)
57+
{
58+
// If we have a canceler, then the source has already been started.
59+
if (_canceler != null) return _initTask.Task;
60+
61+
_log.Info("Starting LaunchDarkly FDv2 polling with interval: {0} milliseconds",
62+
_pollInterval.TotalMilliseconds);
63+
_canceler = _taskExecutor.StartRepeatingTask(TimeSpan.Zero,
64+
_pollInterval, UpdateTaskAsync);
65+
}
66+
67+
return _initTask.Task;
68+
}
69+
70+
private async Task UpdateTaskAsync()
71+
{
72+
_log.Debug("Polling LaunchDarkly for feature flag updates");
73+
try
74+
{
75+
var selector = _selectorSource();
76+
var response = await _requestor.PollingRequestAsync(selector);
77+
78+
if (response == null)
79+
{
80+
// This means we got a cached response (304), so we are valid.
81+
// We could have previously been interrupted, so we need to return to valid even
82+
// if there are no changes.
83+
_dataSourceUpdates.UpdateStatus(DataSourceState.Valid, null);
84+
return;
85+
}
86+
87+
if (response.Value.Headers != null)
88+
{
89+
_environmentId = response.Value.Headers.FirstOrDefault(item =>
90+
item.Key.ToLower() == HeaderConstants.EnvironmentId).Value
91+
?.FirstOrDefault();
92+
}
93+
94+
ProcessPollingResponse(response.Value);
95+
}
96+
catch (UnsuccessfulResponseException ex)
97+
{
98+
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);
99+
100+
if (HttpErrors.IsRecoverable(ex.StatusCode))
101+
{
102+
_log.Warn(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", "will retry"));
103+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
104+
}
105+
else
106+
{
107+
_log.Error(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", ""));
108+
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
109+
try
110+
{
111+
_initTask.SetResult(true);
112+
}
113+
catch (InvalidOperationException)
114+
{
115+
// the task was already set - nothing more to do
116+
}
117+
118+
((IDisposable)this).Dispose();
119+
}
120+
}
121+
catch (JsonException ex)
122+
{
123+
_log.Error("Polling request received malformed data: {0}", LogValues.ExceptionSummary(ex));
124+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
125+
new DataSourceStatus.ErrorInfo
126+
{
127+
Kind = DataSourceStatus.ErrorKind.InvalidData,
128+
Time = DateTime.Now
129+
});
130+
}
131+
catch (Exception ex)
132+
{
133+
var realEx = (ex is AggregateException ae) ? ae.Flatten() : ex;
134+
_log.Warn("Polling for feature flag updates failed: {0}", LogValues.ExceptionSummary(ex));
135+
_log.Debug(LogValues.ExceptionTrace(ex));
136+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
137+
DataSourceStatus.ErrorInfo.FromException(realEx));
138+
}
139+
}
140+
141+
private void ProcessPollingResponse(FDv2PollingResponse response)
142+
{
143+
lock (_protocolLock)
144+
{
145+
_protocolHandler.Reset();
146+
147+
foreach (var evt in response.Events)
148+
{
149+
var action = _protocolHandler.HandleEvent(evt);
150+
ProcessProtocolAction(action);
151+
}
152+
}
153+
}
154+
155+
private void HandleJsonError(string message)
156+
{
157+
_log.Error("LaunchDarkly polling request received invalid data: {0}", message);
158+
159+
var errorInfo = new DataSourceStatus.ErrorInfo
160+
{
161+
Kind = DataSourceStatus.ErrorKind.InvalidData,
162+
Message = message,
163+
Time = DateTime.Now
164+
};
165+
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
166+
}
167+
168+
private void ProcessProtocolAction(IFDv2ProtocolAction action)
169+
{
170+
switch (action)
171+
{
172+
case FDv2ActionChangeset changesetAction:
173+
ProcessChangeSet(changesetAction.Changeset);
174+
break;
175+
case FDv2ActionError errorAction:
176+
_log.Error("FDv2 error event: {0} - {1}", errorAction.Id, errorAction.Reason);
177+
break;
178+
case FDv2ActionGoodbye goodbyeAction:
179+
_log.Info("FDv2 server disconnecting: {0}", goodbyeAction.Reason);
180+
break;
181+
case FDv2ActionInternalError internalErrorAction:
182+
_log.Error("FDv2 protocol error ({0}): {1}", internalErrorAction.ErrorType,
183+
internalErrorAction.Message);
184+
// Handle JsonError by updating status to Interrupted with InvalidData error kind
185+
if (internalErrorAction.ErrorType == FDv2ProtocolErrorType.JsonError)
186+
{
187+
HandleJsonError(internalErrorAction.Message);
188+
}
189+
break;
190+
case FDv2ActionNone _:
191+
// No action needed
192+
break;
193+
default:
194+
// Represents an implementation error. Actions expanded without the handling
195+
// being expanded.
196+
_log.Error("Unhandled FDv2 Protocol Action.");
197+
break;
198+
}
199+
}
200+
201+
private void ProcessChangeSet(FDv2ChangeSet fdv2ChangeSet)
202+
{
203+
if (!(_dataSourceUpdates is ITransactionalDataSourceUpdates transactionalDataSourceUpdates))
204+
throw new InvalidOperationException("Cannot apply updates to non-transactional data source");
205+
206+
var dataStoreChangeSet = FDv2ChangeSetTranslator.ToChangeSet(fdv2ChangeSet, _log, _environmentId);
207+
208+
// If the update fails, then we wait until the next poll and try again.
209+
// This is different from a streaming data source, which will need to re-start to get an initial
210+
// payload.
211+
if (!transactionalDataSourceUpdates.Apply(dataStoreChangeSet)) return;
212+
213+
// Only mark as initialized after successfully applying a changeset
214+
if (_initialized.GetAndSet(true)) return;
215+
_initTask.SetResult(true);
216+
_log.Info("First polling request successful");
217+
}
218+
219+
void IDisposable.Dispose()
220+
{
221+
Dispose(true);
222+
GC.SuppressFinalize(this);
223+
}
224+
225+
private void Dispose(bool disposing)
226+
{
227+
if (!disposing) return;
228+
229+
Shutdown();
230+
}
231+
232+
private void Shutdown()
233+
{
234+
_canceler?.Cancel();
235+
_requestor.Dispose();
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)