From b6f0edd252e7ff4178e78e5e6e51899ec73f1aa2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 4 Oct 2019 18:03:38 +0100 Subject: [PATCH] Helper naming consistency --- equinox-testbed/Infrastructure.fs | 4 +-- equinox-testbed/Services.fs | 5 +++- equinox-web/Domain/Aggregate.fs | 8 +++-- equinox-web/Domain/Infrastructure.fs | 2 +- equinox-web/Domain/Todo.fs | 3 +- propulsion-summary-consumer/Infrastructure.fs | 2 +- .../{SumaryIngester.fs => Ingester.fs} | 6 ++-- propulsion-summary-consumer/Program.fs | 2 +- .../SummaryConsumer.fsproj | 2 +- propulsion-summary-consumer/TodoSummary.fs | 14 +++++---- .../Infrastructure.fs | 1 - propulsion-summary-projector/Producer.fs | 2 +- propulsion-summary-projector/Todo.fs | 4 ++- .../Infrastructure.fs | 18 ++++++++++-- .../{SkuIngester.fs => Ingester.fs} | 29 +++++-------------- propulsion-tracking-consumer/Program.fs | 2 +- propulsion-tracking-consumer/SkuSummary.fs | 16 +++++----- .../TrackingConsumer.fsproj | 2 +- 18 files changed, 65 insertions(+), 57 deletions(-) rename propulsion-summary-consumer/{SumaryIngester.fs => Ingester.fs} (96%) rename propulsion-tracking-consumer/{SkuIngester.fs => Ingester.fs} (64%) diff --git a/equinox-testbed/Infrastructure.fs b/equinox-testbed/Infrastructure.fs index 814341c58..bfc12f98b 100644 --- a/equinox-testbed/Infrastructure.fs +++ b/equinox-testbed/Infrastructure.fs @@ -16,10 +16,10 @@ module Guid = /// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant type ClientId = Guid and [] clientId -module ClientId = let toStringN (value : ClientId) : string = Guid.toStringN %value +module ClientId = let toString (value : ClientId) : string = Guid.toStringN %value /// SkuId strongly typed id; represented internally as a Guid // NB Perf is suboptimal as a key, see Equinox's samples/Store for expanded version type SkuId = Guid and [] skuId -module SkuId = let toStringN (value : SkuId) : string = Guid.toStringN %value \ No newline at end of file +module SkuId = let toString (value : SkuId) : string = Guid.toStringN %value \ No newline at end of file diff --git a/equinox-testbed/Services.fs b/equinox-testbed/Services.fs index 4fe5fa19f..aaf7b5446 100644 --- a/equinox-testbed/Services.fs +++ b/equinox-testbed/Services.fs @@ -7,6 +7,7 @@ module Domain = // NB - these schemas reflect the actual storage formats and hence need to be versioned with care module Events = + type Favorited = { date: System.DateTimeOffset; skuId: SkuId } type Unfavorited = { skuId: SkuId } module Compaction = @@ -20,6 +21,7 @@ module Domain = let codec = FsCodec.NewtonsoftJson.Codec.Create() module Folds = + type State = Events.Favorited [] type private InternalState(input: State) = @@ -61,7 +63,8 @@ module Domain = [ Events.Unfavorited { skuId = skuId } ] type Service(log, resolveStream, ?maxAttempts) = - let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toStringN id) + + let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 2) let execute (Stream stream) command : Async = stream.Transact(Commands.interpret command) diff --git a/equinox-web/Domain/Aggregate.fs b/equinox-web/Domain/Aggregate.fs index 5ede845f2..cc06883cc 100644 --- a/equinox-web/Domain/Aggregate.fs +++ b/equinox-web/Domain/Aggregate.fs @@ -2,13 +2,14 @@ // NB - these types and names reflect the actual storage formats and hence need to be versioned with care module Events = - type Compacted = { happened: bool } + + type CompactedData = { happened: bool } type Event = | Happened - | Compacted of Compacted + | Compacted of CompactedData interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.NewtonsoftJson.Codec.Create(rejectNullaryCases=false) module Folds = @@ -33,6 +34,7 @@ module Commands = type View = { sorted : bool } type Service(handlerLog, resolve, ?maxAttempts) = + let (|AggregateId|) (id: string) = Equinox.AggregateId("Aggregate", id) let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2) diff --git a/equinox-web/Domain/Infrastructure.fs b/equinox-web/Domain/Infrastructure.fs index 02c5697e5..0c0c15dde 100644 --- a/equinox-web/Domain/Infrastructure.fs +++ b/equinox-web/Domain/Infrastructure.fs @@ -10,4 +10,4 @@ module Guid = type ClientId = Guid and [] clientId module ClientId = - let toStringN (value : ClientId) : string = Guid.toStringN %value \ No newline at end of file + let toString (value : ClientId) : string = Guid.toStringN %value \ No newline at end of file diff --git a/equinox-web/Domain/Todo.fs b/equinox-web/Domain/Todo.fs index 2fa453323..c311707ae 100644 --- a/equinox-web/Domain/Todo.fs +++ b/equinox-web/Domain/Todo.fs @@ -72,8 +72,9 @@ type View = { id: int; order: int; title: string; completed: bool } /// Defines operations that a Controller can perform on a Todo List type Service(handlerLog, resolve, ?maxAttempts) = + /// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held - let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toStringN clientId) + let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toString clientId) /// Maps a ClientId to Handler for the relevant stream let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2) diff --git a/propulsion-summary-consumer/Infrastructure.fs b/propulsion-summary-consumer/Infrastructure.fs index 84e42c046..a1fe8455e 100644 --- a/propulsion-summary-consumer/Infrastructure.fs +++ b/propulsion-summary-consumer/Infrastructure.fs @@ -32,5 +32,5 @@ module Guid = type ClientId = Guid and [] clientId module ClientId = - let toStringN (value : ClientId) : string = Guid.toStringN %value + let toString (value : ClientId) : string = Guid.toStringN %value let parse (value : string) : ClientId = let raw = Guid.Parse value in % raw \ No newline at end of file diff --git a/propulsion-summary-consumer/SumaryIngester.fs b/propulsion-summary-consumer/Ingester.fs similarity index 96% rename from propulsion-summary-consumer/SumaryIngester.fs rename to propulsion-summary-consumer/Ingester.fs index a17873d2c..cb3abea33 100644 --- a/propulsion-summary-consumer/SumaryIngester.fs +++ b/propulsion-summary-consumer/Ingester.fs @@ -1,6 +1,6 @@ /// Follows a feed of updates, holding the most recently observed one; each update recieved is intended to completely supersede all previous updates /// Due to this, we should ensure that writes only happen where the update is not redundant and/or a replay of a previus message -module ConsumerTemplate.SummaryIngester +module ConsumerTemplate.Ingester open System @@ -31,7 +31,7 @@ type Outcome = NoRelevantEvents of count : int | Ok of count : int | Skipped of type Stats(log, ?statsInterval, ?stateInterval) = inherit Propulsion.Kafka.StreamsConsumerStats(log, defaultArg statsInterval (TimeSpan.FromMinutes 1.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)) - let mutable (ok, na, redundant) = 0, 0, 0 + let mutable ok, na, redundant = 0, 0, 0 override __.HandleOk res = res |> function | Outcome.Ok count -> ok <- ok + 1; redundant <- redundant + count - 1 @@ -51,7 +51,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log { items = [| for x in x.items -> { id = x.id; order = x.order; title = x.title; completed = x.completed } |]} - let (|ClientId|) (value : string) = ClientId.parse value + let (|ClientId|) = ClientId.parse let (|DecodeNewest|_|) (codec : FsCodec.IUnionEncoder<_,_>) (stream, span : Propulsion.Streams.StreamSpan<_>) : 'summary option = span.events |> Seq.rev |> Seq.tryPick (StreamCodec.tryDecode codec log stream) let ingestIncomingSummaryMessage (stream, span : Propulsion.Streams.StreamSpan<_>) : Async = async { diff --git a/propulsion-summary-consumer/Program.fs b/propulsion-summary-consumer/Program.fs index f840030d6..83f93eb31 100644 --- a/propulsion-summary-consumer/Program.fs +++ b/propulsion-summary-consumer/Program.fs @@ -112,7 +112,7 @@ let start (args : CmdParser.Arguments) = Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create( appName, args.Broker, [args.Topic], args.Group, maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency) - SummaryIngester.startConsumer config Log.Logger service args.MaxDop + Ingester.startConsumer config Log.Logger service args.MaxDop /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index b82a04bec..ce6fbef04 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -10,7 +10,7 @@ - + diff --git a/propulsion-summary-consumer/TodoSummary.fs b/propulsion-summary-consumer/TodoSummary.fs index f1e6868aa..7e4a8c4bb 100644 --- a/propulsion-summary-consumer/TodoSummary.fs +++ b/propulsion-summary-consumer/TodoSummary.fs @@ -5,9 +5,9 @@ module Events = type ItemData = { id: int; order: int; title: string; completed: bool } type SummaryData = { items : ItemData[] } - + type IngestedData = { version : int64; value : SummaryData } type Event = - | Ingested of {| version: int64; value : SummaryData |} + | Ingested of IngestedData interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -36,13 +36,13 @@ module Folds = module Commands = type Command = - | Consume of version: int64 * Events.SummaryData + | Consume of version : int64 * value : Events.SummaryData let decide command (state : Folds.State) = match command with | Consume (version,value) -> if state.version <= version then false,[] else - true,[Events.Ingested {| version = version; value = value |}] + true,[Events.Ingested { version = version; value = value }] type Item = { id: int; order: int; title: string; completed: bool } let render : Folds.State -> Item[] = function @@ -54,11 +54,12 @@ let render : Folds.State -> Item[] = function completed = x.completed } |] | _ -> [||] -let []categoryId = "TodoSummary" +let [] categoryId = "TodoSummary" /// Defines the operations that the Read side of a Controller and/or the Ingester can perform on the 'aggregate' type Service(log, resolve, ?maxAttempts) = - let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toStringN clientId) + + let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) let execute (Stream stream) command : Async = @@ -74,6 +75,7 @@ type Service(log, resolve, ?maxAttempts) = query clientId render module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = // We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots diff --git a/propulsion-summary-projector/Infrastructure.fs b/propulsion-summary-projector/Infrastructure.fs index e5ccb3ade..e91564f90 100644 --- a/propulsion-summary-projector/Infrastructure.fs +++ b/propulsion-summary-projector/Infrastructure.fs @@ -27,7 +27,6 @@ module StreamNameParser = | [| category; id |] -> Category (category, id) | _ -> Unknown streamName - module Guid = let inline toStringN (x : Guid) = x.ToString "N" diff --git a/propulsion-summary-projector/Producer.fs b/propulsion-summary-projector/Producer.fs index 1c0f19cb9..cf8ffe720 100644 --- a/propulsion-summary-projector/Producer.fs +++ b/propulsion-summary-projector/Producer.fs @@ -23,7 +23,7 @@ module Contract = let ofState (state : Todo.Folds.State) : SummaryEvent = Summary { items = [| for x in state.items -> render x |]} -let (|ClientId|) (value : string) = ClientId.parse value +let (|ClientId|) = ClientId.parse let (|Decode|) (codec : FsCodec.IUnionEncoder<_,_>) stream (span : Propulsion.Streams.StreamSpan<_>) = span.events |> Seq.choose (StreamCodec.tryDecodeSpan codec Serilog.Log.Logger stream) diff --git a/propulsion-summary-projector/Todo.fs b/propulsion-summary-projector/Todo.fs index 6fba6e327..521293fbf 100644 --- a/propulsion-summary-projector/Todo.fs +++ b/propulsion-summary-projector/Todo.fs @@ -41,10 +41,11 @@ module Folds = /// Allows us to slkip producing summaries for events that we know won't result in an externally discernable change to the summary output let impliesStateChange = function Events.Snapshot _ -> false | _ -> true -let []categoryId = "Todos" +let [] categoryId = "Todos" /// Defines operations that a Controller or Projector can perform on a Todo List type Service(log, resolve, ?maxAttempts) = + /// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId) @@ -60,6 +61,7 @@ type Service(log, resolve, ?maxAttempts) = queryEx clientId render module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot) diff --git a/propulsion-tracking-consumer/Infrastructure.fs b/propulsion-tracking-consumer/Infrastructure.fs index 8967933a5..1cc82899d 100644 --- a/propulsion-tracking-consumer/Infrastructure.fs +++ b/propulsion-tracking-consumer/Infrastructure.fs @@ -15,8 +15,22 @@ module StreamCodec = None | x -> x -module Guid = - let inline toStringN (x : Guid) = x.ToString "N" +// TODO use one included in Propulsion.Kafka.Core +/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index. +/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary +type StreamKeyEventSequencer() = + // we synthesize a monotonically increasing index to render the deduplication facility inert + let indices = System.Collections.Generic.Dictionary() + let genIndex streamName = + match indices.TryGetValue streamName with + | true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x + | false, _ -> let x = 0 in indices.[streamName] <- x; int64 x + + // Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span + member __.ToStreamEvent(KeyValue (k,v : string), ?eventType) : Propulsion.Streams.StreamEvent seq = + let eventType = defaultArg eventType String.Empty + let e = FsCodec.Core.IndexedEventData(genIndex k,false,eventType,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow) + Seq.singleton { stream=k; event=e } /// SkuId strongly typed id; represented internally as a string type SkuId = string diff --git a/propulsion-tracking-consumer/SkuIngester.fs b/propulsion-tracking-consumer/Ingester.fs similarity index 64% rename from propulsion-tracking-consumer/SkuIngester.fs rename to propulsion-tracking-consumer/Ingester.fs index c3ad5a883..8a3afa9a6 100644 --- a/propulsion-tracking-consumer/SkuIngester.fs +++ b/propulsion-tracking-consumer/Ingester.fs @@ -1,12 +1,12 @@ /// Follows a feed of messages representing items being added/updated on an aggregate that maintains a list of child items -/// Compared to the SummaryIngester in the `summaryProjector` template, each event is potentially relevant -module ConsumerTemplate.SkuIngester +/// Compared to the Ingester in the `summaryProjector` template, each event is potentially relevant +module ConsumerTemplate.Ingester open ConsumerTemplate.SkuSummary.Events open System /// Defines the shape of input messages on the topic we're consuming -module SkuUpdates = +module Contract = type OrderInfo = { poNumber : string; reservedUnitQuantity : int } type Message = @@ -36,29 +36,14 @@ type Stats(log, ?statsInterval, ?stateInterval) = log.Information(" Used {ok} Ignored {skipped}", ok, skipped) ok <- 0; skipped <- 0 -/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index. -/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary -type MessagesByArrivalOrder() = - // we synthesize a monotonically increasing index to render the deduplication facility inert - let indices = System.Collections.Generic.Dictionary() - let genIndex streamName = - match indices.TryGetValue streamName with - | true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x - | false, _ -> let x = 0 in indices.[streamName] <- x; int64 x - - // Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span - member __.ToStreamEvent (KeyValue (k,v : string)) : Propulsion.Streams.StreamEvent seq = - let e = FsCodec.Core.IndexedEventData(genIndex k,false,String.Empty,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow) - Seq.singleton { stream=k; event=e } - -let (|SkuId|) (value : string) = SkuId.parse value +let (|SkuId|) = SkuId.parse /// Starts a processing loop accumulating messages by stream - each time we handle all the incoming updates for a give Sku as a single transaction let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log : Serilog.ILogger) (service : SkuSummary.Service) maxDop = let ingestIncomingSummaryMessage(SkuId skuId, span : Propulsion.Streams.StreamSpan<_>) : Async = async { let items = [ for e in span.events do - let x = SkuUpdates.parse e.Data + let x = Contract.parse e.Data for o in x.purchaseOrderInfo do yield { locationId = x.locationId messageIndex = x.messageIndex @@ -69,7 +54,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log return Outcome.Completed(used,List.length items) } let stats = Stats(log) - // No categorization required, out inputs are all one big family defying categorization + // No categorization required, our inputs are all one big family defying categorization let category _streamName = "Sku" - let sequencer = MessagesByArrivalOrder() + let sequencer = StreamKeyEventSequencer() Propulsion.Kafka.StreamsConsumer.Start(log, config, sequencer.ToStreamEvent, ingestIncomingSummaryMessage, maxDop, stats, category) \ No newline at end of file diff --git a/propulsion-tracking-consumer/Program.fs b/propulsion-tracking-consumer/Program.fs index bf6b63900..da84d49fb 100644 --- a/propulsion-tracking-consumer/Program.fs +++ b/propulsion-tracking-consumer/Program.fs @@ -112,7 +112,7 @@ let start (args : CmdParser.Arguments) = Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create( appName, args.Broker, [args.Topic], args.Group, maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency) - SkuIngester.startConsumer config Log.Logger service args.MaxDop + Ingester.startConsumer config Log.Logger service args.MaxDop /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main diff --git a/propulsion-tracking-consumer/SkuSummary.fs b/propulsion-tracking-consumer/SkuSummary.fs index 1f04ed845..e2f7491c8 100644 --- a/propulsion-tracking-consumer/SkuSummary.fs +++ b/propulsion-tracking-consumer/SkuSummary.fs @@ -17,9 +17,7 @@ module Events = module Folds = - open Events - - type State = ItemData list + type State = Events.ItemData list module State = let equals (x : Events.ItemData) (y : Events.ItemData) = x.locationId = y.locationId @@ -36,10 +34,10 @@ module Folds = | Events.Snapshotted _ -> true // Yes, a snapshot is enough info | Events.Ingested _ -> false let evolve state = function - | Ingested e -> e :: state - | Snapshotted items -> List.ofArray items - let fold (state : State) : Event seq -> State = Seq.fold evolve state - let snapshot (x : State) : Event = Snapshotted (Array.ofList x) + | Events.Ingested e -> e :: state + | Events.Snapshotted items -> List.ofArray items + let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state + let snapshot (x : State) : Events.Event = Events.Snapshotted (Array.ofList x) module Commands = @@ -51,9 +49,10 @@ module Commands = | Consume updates -> [for x in updates do if x |> Folds.State.isNewOrUpdated state then yield Events.Ingested x] -let []categoryId = "SkuSummary" +let [] categoryId = "SkuSummary" type Service(log, resolve, ?maxAttempts) = + let (|AggregateId|) (id : SkuId) = Equinox.AggregateId(categoryId, SkuId.toString id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) @@ -74,6 +73,7 @@ type Service(log, resolve, ?maxAttempts) = query skuId id module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = // We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots diff --git a/propulsion-tracking-consumer/TrackingConsumer.fsproj b/propulsion-tracking-consumer/TrackingConsumer.fsproj index 507f99eec..4391c219d 100644 --- a/propulsion-tracking-consumer/TrackingConsumer.fsproj +++ b/propulsion-tracking-consumer/TrackingConsumer.fsproj @@ -10,7 +10,7 @@ - +