Skip to content

Commit bfa1caa

Browse files
committed
Merge branch 'main' into rlamb/sdk-1584/fdv2-polling-data-source
2 parents ff7c34d + c210d06 commit bfa1caa

16 files changed

+2198
-73
lines changed
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using LaunchDarkly.Sdk.Server.Interfaces;
5+
using LaunchDarkly.Sdk.Server.Subsystems;
6+
7+
namespace LaunchDarkly.Sdk.Server.Internal.DataSources
8+
{
9+
/// <summary>
10+
/// A composite source is a source that can dynamically switch between sources with the
11+
/// help of a list of <see cref="ISourceFactory"/> instances and <see cref="IActionApplierFactory"/> instances.
12+
/// The ISourceFactory instances are used to create the data sources, and the IActionApplierFactory creates the action appliers that are used
13+
/// to apply actions to the composite source as updates are received from the data sources.
14+
/// </summary>
15+
internal sealed class CompositeSource : IDataSource, ICompositeSourceActionable
16+
{
17+
// All mutable state and the internal action queue are protected by this lock.
18+
// We also use a small, non-recursive action queue so that any re-entrant calls
19+
// from IActionApplier logic are serialized and processed iteratively instead
20+
// of recursively, avoiding the risk of stack overflows.
21+
private readonly object _lock = new object();
22+
private readonly Queue<Action> _pendingActions = new Queue<Action>();
23+
private bool _isProcessingActions;
24+
private bool _disposed;
25+
26+
private readonly IDataSourceUpdates _originalUpdateSink;
27+
private readonly IDataSourceUpdates _sanitizedUpdateSink;
28+
private readonly SourcesList<(ISourceFactory Factory, IActionApplierFactory ActionApplierFactory)> _sourcesList;
29+
private readonly DisableableDataSourceUpdatesTracker _disableableTracker;
30+
31+
// Tracks the entry from the sources list that was used to create the current
32+
// data source instance. This allows operations such as blacklist to remove
33+
// the correct factory/action-applier-factory tuple from the list.
34+
private (ISourceFactory Factory, IActionApplierFactory ActionApplierFactory) _currentEntry;
35+
private IDataSource _currentDataSource;
36+
37+
/// <summary>
38+
/// Creates a new <see cref="CompositeSource"/>.
39+
/// </summary>
40+
/// <param name="updatesSink">the sink that receives updates from the active source</param>
41+
/// <param name="factoryTuples">the ordered list of source factories and their associated action applier factories</param>
42+
/// <param name="circular">whether to loop off the end of the list back to the start when fallback occurs</param>
43+
public CompositeSource(
44+
IDataSourceUpdates updatesSink,
45+
IList<(ISourceFactory Factory, IActionApplierFactory ActionApplierFactory)> factoryTuples,
46+
bool circular = true)
47+
{
48+
if (updatesSink is null)
49+
{
50+
throw new ArgumentNullException(nameof(updatesSink));
51+
}
52+
if (factoryTuples is null)
53+
{
54+
throw new ArgumentNullException(nameof(factoryTuples));
55+
}
56+
57+
_originalUpdateSink = updatesSink;
58+
_sanitizedUpdateSink = new DataSourceUpdatesSanitizer(updatesSink);
59+
60+
// this tracker is used to disconnect the current source from the updates sink when it is no longer needed.
61+
_disableableTracker = new DisableableDataSourceUpdatesTracker();
62+
63+
_sourcesList = new SourcesList<(ISourceFactory SourceFactory, IActionApplierFactory ActionApplierFactory)>(
64+
circular: circular,
65+
initialList: factoryTuples
66+
);
67+
}
68+
69+
/// <summary>
70+
/// When <see cref="Start"/> is called, the current data source is started. This should only be called once.
71+
/// </summary>
72+
/// <returns>
73+
/// a task that completes when the underlying current source has finished starting
74+
/// </returns>
75+
public Task<bool> Start()
76+
{
77+
return StartCurrent();
78+
}
79+
80+
/// <summary>
81+
/// Returns whether the current underlying data source has finished initializing.
82+
/// </summary>
83+
public bool Initialized => _currentDataSource?.Initialized ?? false;
84+
85+
/// <summary>
86+
/// Disposes of the composite data source.
87+
/// </summary>
88+
public void Dispose()
89+
{
90+
// When disposing the whole composite, we bypass the action queue and tear
91+
// down the current data source immediately while still honoring the same
92+
// state transitions under the shared lock. Any queued actions become no-ops
93+
// because there is no current data source
94+
// been disconnected.
95+
lock (_lock)
96+
{
97+
// cut off all the update proxies that have been handed out first
98+
_disableableTracker.DisablePreviouslyTracked();
99+
100+
// dispose of the current data source
101+
_currentDataSource?.Dispose();
102+
_currentDataSource = null;
103+
104+
// clear any queued actions and reset processing state
105+
_pendingActions.Clear();
106+
_isProcessingActions = false;
107+
_sourcesList.Reset();
108+
_currentEntry = default;
109+
110+
_disposed = true;
111+
}
112+
113+
// report state Off directly to the original sink, bypassing the sanitizer
114+
// which would map Off to Interrupted (that mapping is only for underlying sources)
115+
_originalUpdateSink.UpdateStatus(DataSourceState.Off, null);
116+
}
117+
118+
/// <summary>
119+
/// Enqueue a state-changing operation to be executed under the shared lock.
120+
/// If no other operation is currently running, this will synchronously process
121+
/// the queue in a simple loop on the current thread. Any re-entrant calls from
122+
/// within the operations will only enqueue more work; they will not trigger
123+
/// another processing loop, so the call stack does not grow with the queue length.
124+
/// </summary>
125+
private void EnqueueAction(Action action)
126+
{
127+
bool shouldProcess = false;
128+
lock (_lock)
129+
{
130+
_pendingActions.Enqueue(action);
131+
if (!_isProcessingActions)
132+
{
133+
_isProcessingActions = true;
134+
shouldProcess = true;
135+
}
136+
}
137+
138+
if (shouldProcess)
139+
{
140+
ProcessQueuedActions();
141+
}
142+
}
143+
144+
/// <summary>
145+
/// Processes the queued actions.
146+
/// </summary>
147+
private void ProcessQueuedActions()
148+
{
149+
while (true)
150+
{
151+
Action action;
152+
lock (_lock)
153+
{
154+
if (_pendingActions.Count == 0)
155+
{
156+
_isProcessingActions = false;
157+
return;
158+
}
159+
160+
action = _pendingActions.Dequeue();
161+
}
162+
163+
// Execute outside of the lock so that operations can do more work,
164+
// including calling back into the composite and queuing additional
165+
// actions without blocking the queue.
166+
// If an action throws an exception, catch it and continue processing
167+
// the next action to prevent one failure from stopping all processing.
168+
try
169+
{
170+
action();
171+
}
172+
catch
173+
{
174+
// Continue processing remaining actions even if one fails
175+
// TODO: need to add logging, will add in next PR
176+
}
177+
}
178+
}
179+
180+
private void TryFindNextUnderLock()
181+
{
182+
// This method must only be called while holding _lock.
183+
if (_currentDataSource != null)
184+
{
185+
return;
186+
}
187+
188+
var entry = _sourcesList.Next();
189+
if (entry.Factory == null)
190+
{
191+
return;
192+
}
193+
194+
// Build the list of update sinks conditionally based on whether we have an action applier factory
195+
var updateSinks = new List<IDataSourceUpdates>();
196+
if (entry.ActionApplierFactory != null)
197+
{
198+
var actionApplier = entry.ActionApplierFactory.CreateActionApplier(this);
199+
updateSinks.Add(actionApplier);
200+
}
201+
updateSinks.Add(_sanitizedUpdateSink);
202+
203+
// here we make a fanout so that we can trigger actions as well as forward calls to the sanitized sink (order matters here)
204+
var fanout = new FanOutDataSourceUpdates(updateSinks);
205+
var disableableUpdates = _disableableTracker.WrapAndTrack(fanout);
206+
207+
_currentEntry = entry;
208+
_currentDataSource = entry.Factory.CreateSource(disableableUpdates);
209+
}
210+
211+
#region ICompositeSourceActionable
212+
213+
/// <summary>
214+
/// Starts the current data source.
215+
/// </summary>
216+
public Task<bool> StartCurrent()
217+
{
218+
if (_disposed)
219+
{
220+
return Task.FromResult(false);
221+
}
222+
223+
var tcs = new TaskCompletionSource<bool>();
224+
225+
EnqueueAction(() =>
226+
{
227+
IDataSource dataSourceToStart;
228+
lock (_lock)
229+
{
230+
TryFindNextUnderLock();
231+
dataSourceToStart = _currentDataSource;
232+
}
233+
234+
if (dataSourceToStart is null)
235+
{
236+
// No sources available.
237+
tcs.SetResult(false);
238+
return;
239+
}
240+
241+
// Start the source asynchronously and complete the task when it finishes.
242+
// We do this outside the lock to avoid blocking the queue.
243+
_ = Task.Run(async () =>
244+
{
245+
try
246+
{
247+
var result = await dataSourceToStart.Start().ConfigureAwait(false);
248+
tcs.TrySetResult(result);
249+
}
250+
catch (Exception ex)
251+
{
252+
tcs.TrySetException(ex);
253+
}
254+
});
255+
});
256+
257+
return tcs.Task;
258+
}
259+
260+
/// <summary>
261+
/// Disposes of the current data source. You must call GoToNext or GoToFirst after this to change to a new factory.
262+
/// </summary>
263+
public void DisposeCurrent()
264+
{
265+
if (_disposed)
266+
{
267+
return;
268+
}
269+
270+
EnqueueAction(() =>
271+
{
272+
lock (_lock)
273+
{
274+
// cut off all the update proxies that have been handed out first, this is
275+
// necessary to avoid a cascade of actions leading to callbacks leading to actions, etc.
276+
_disableableTracker.DisablePreviouslyTracked();
277+
278+
// dispose of the current data source
279+
_currentDataSource?.Dispose();
280+
_currentDataSource = null;
281+
282+
// spoof interrupted (if the underlying source reported interrupted, sanitizer will not report it again)
283+
_sanitizedUpdateSink.UpdateStatus(DataSourceState.Interrupted, null);
284+
}
285+
});
286+
}
287+
288+
/// <summary>
289+
/// Switches to the next source in the list. You must still call StartCurrent after this to actually start the new source.
290+
/// </summary>
291+
public void GoToNext()
292+
{
293+
if (_disposed)
294+
{
295+
return;
296+
}
297+
298+
EnqueueAction(() =>
299+
{
300+
lock (_lock)
301+
{
302+
// cut off all the update proxies that have been handed out first, this is
303+
// necessary to avoid a cascade of actions leading to callbacks leading to actions, etc.
304+
_disableableTracker.DisablePreviouslyTracked();
305+
306+
_currentDataSource?.Dispose();
307+
_currentDataSource = null;
308+
309+
// spoof interrupted (if the underlying source reported interrupted, sanitizer will not report it again)
310+
_sanitizedUpdateSink.UpdateStatus(DataSourceState.Interrupted, null);
311+
312+
TryFindNextUnderLock();
313+
314+
// if there is no next source, there's nothing more to do
315+
}
316+
});
317+
}
318+
319+
/// <summary>
320+
/// Switches to the first source in the list. You must still call StartCurrent after this to actually start the new source.
321+
/// </summary>
322+
public void GoToFirst()
323+
{
324+
if (_disposed)
325+
{
326+
return;
327+
}
328+
329+
EnqueueAction(() =>
330+
{
331+
lock (_lock)
332+
{
333+
// moving always disconnects the current source
334+
_disableableTracker.DisablePreviouslyTracked();
335+
336+
_currentDataSource?.Dispose();
337+
_currentDataSource = null;
338+
339+
// spoof interrupted (if the underlying source reported interrupted, sanitizer will not report it again)
340+
_sanitizedUpdateSink.UpdateStatus(DataSourceState.Interrupted, null);
341+
342+
_sourcesList.Reset();
343+
TryFindNextUnderLock();
344+
345+
// if there are no sources, there's nothing more to do
346+
}
347+
});
348+
}
349+
350+
/// <summary>
351+
/// Blacklists the current source. This prevents the current source from being used again.
352+
/// Note that blacklisting does not tear down the current data source, it just prevents it from being used again.
353+
/// </summary>
354+
public void BlacklistCurrent()
355+
{
356+
if (_disposed)
357+
{
358+
return;
359+
}
360+
361+
EnqueueAction(() =>
362+
{
363+
lock (_lock)
364+
{
365+
// If we've never had a current entry, there's nothing to blacklist.
366+
if (_currentEntry == default)
367+
{
368+
return;
369+
}
370+
371+
// remove the factory tuple for our current entry
372+
// note: blacklisting does not tear down the current data source, it just prevents it from being used again
373+
_sourcesList.Remove(_currentEntry);
374+
_currentEntry = default;
375+
}
376+
});
377+
}
378+
379+
#endregion
380+
}
381+
}
382+
383+

0 commit comments

Comments
 (0)