Skip to content

Commit b58ecee

Browse files
committed
feat(propulsuon sync): Add --requireAll
1 parent 79a0d06 commit b58ecee

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

src/Propulsion.CosmosStore/CosmosStoreSink.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ type CosmosStoreSink =
185185
?maxEvents,
186186
// Default: 256KB (limited by maximum size of a CosmosDB stored procedure invocation)
187187
?maxBytes,
188-
?ingesterStateInterval, ?commitInterval): SinkPipeline =
188+
?ingesterStateInterval, ?commitInterval, ?requireAll): SinkPipeline =
189189
let dispatcher = Internal.Dispatcher.Create(log, eventsContext, maxConcurrentStreams, ?maxEvents = maxEvents, ?maxBytes = maxBytes)
190190
let scheduler =
191191
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
192192
Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, prioritizeStreamsBy = Event.storedSize,
193-
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
193+
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?requireAll = requireAll)
194194
Factory.Start(log, scheduler.Pump, maxReadAhead, scheduler,
195195
ingesterStateInterval = defaultArg ingesterStateInterval stats.StateInterval.Period, ?commitInterval = commitInterval)

tools/Propulsion.Tool/Sync.fs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
1111
| [<AltCommandLine "-w"; Unique>] MaxWriters of int
1212
| [<AltCommandLine "-Z"; Unique>] FromTail
1313
| [<AltCommandLine "-F"; Unique>] Follow
14+
| [<AltCommandLine "-A"; Unique>] RequireAll
1415
| [<AltCommandLine "-C"; Unique>] Categorize
1516
| [<AltCommandLine "-b"; Unique>] MaxItems of int
1617

@@ -32,6 +33,10 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
3233
| MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Cosmos: 16)."
3334
| FromTail -> "(iff fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start."
3435
| Follow -> "Stop when the Tail is reached."
36+
| RequireAll -> "Wait for out of order events to arrive (including waiting for event 0 per stream) before dispatching for any stream. " +
37+
"NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " +
38+
"NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " +
39+
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
3540
| Categorize -> "Gather handler latency stats by category"
3641
| MaxItems _ -> "Controls checkpointing granularity by adjusting the batch size being loaded from the feed. Default: Unlimited"
3742

@@ -217,7 +222,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
217222
| Some x, _ -> x
218223
| None, Json _ -> System.Guid.NewGuid() |> _.ToString("N")
219224
| None, _ -> p.Raise "ConsumerGroupName is mandatory, unless consuming from a JSON file"
220-
let startFromTail, follow, maxItems = p.Contains FromTail, p.Contains Follow, p.TryGetResult MaxItems
225+
let startFromTail, follow, requireAll, maxItems = p.Contains FromTail, p.Contains Follow, p.Contains RequireAll, p.TryGetResult MaxItems
221226
let producer =
222227
match a.Command with
223228
| SubCommand.Kafka a ->
@@ -243,12 +248,12 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
243248
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
244249
do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore
245250
return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 }
246-
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats)
251+
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, requireAll = requireAll)
247252
| SubCommand.Sync sa ->
248253
let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously
249254
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, logExternalStats = dumpStoreStats, Categorize = a.Categorize)
250255
Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
251-
purgeInterval = TimeSpan.hours 1, maxBytes = sa.MaxBytes)
256+
purgeInterval = TimeSpan.hours 1, maxBytes = sa.MaxBytes, requireAll = requireAll)
252257
let source =
253258
match a.Command.Source with
254259
| Cosmos sa ->

0 commit comments

Comments
 (0)