From 0dc9662a5290df314946362640ced3ee694a42c7 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 17 Sep 2019 14:31:50 +0100 Subject: [PATCH] Preparatory refactoring of summary-projector for ES --- propulsion-summary-projector/Program.fs | 247 +++++++++--------- .../SummaryProjector.fsproj | 2 +- 2 files changed, 125 insertions(+), 124 deletions(-) diff --git a/propulsion-summary-projector/Program.fs b/propulsion-summary-projector/Program.fs index df1da1a7f..f2ce9b3ca 100644 --- a/propulsion-summary-projector/Program.fs +++ b/propulsion-summary-projector/Program.fs @@ -14,107 +14,114 @@ module CmdParser = | null -> raise <| MissingArg (sprintf "Please provide a %s, either as an argument or via the %s environment variable" msg key) | x -> x - module Cosmos = - type [] Parameters = - | [] Connection of string - | [] ConnectionMode of ConnectionMode - | [] Database of string - | [] Container of string - | [] Timeout of float - | [] Retries of int - | [] RetriesWaitTime of int - interface IArgParserTemplate with - member a.Usage = - match a with - | Connection _ -> "specify a connection string for a Cosmos account (defaults: envvar:EQUINOX_COSMOS_CONNECTION, Cosmos Emulator)." - | ConnectionMode _ -> "override the connection mode (default: DirectTcp)." - | Database _ -> "specify a database name for Cosmos store (defaults: envvar:EQUINOX_COSMOS_DATABASE)." - | Container _ -> "specify a container name for Cosmos store (defaults: envvar:EQUINOX_COSMOS_CONTAINER)." - | Timeout _ -> "specify operation timeout in seconds (default: 5)." - | Retries _ -> "specify operation retries (default: 1)." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" - type Arguments(a : ParseResults) = - member __.Mode = a.GetResult(ConnectionMode,ConnectionMode.Direct) - member __.Connection = match a.TryGetResult Connection with Some x -> x | None -> envBackstop "Connection" "EQUINOX_COSMOS_CONNECTION" - member __.Database = match a.TryGetResult Database with Some x -> x | None -> envBackstop "Database" "EQUINOX_COSMOS_DATABASE" - member __.Container = match a.TryGetResult Container with Some x -> x | None -> envBackstop "Container" "EQUINOX_COSMOS_CONTAINER" - - member __.Timeout = a.GetResult(Timeout,5.) |> TimeSpan.FromSeconds - member __.Retries = a.GetResult(Retries, 1) - member __.MaxRetryWaitTime = a.GetResult(RetriesWaitTime, 5) - - member x.BuildConnectionDetails() = - let (Discovery.UriAndKey (endpointUri,_) as discovery) = Discovery.FromConnectionString x.Connection - Log.Information("CosmosDb {mode} {endpointUri} Database {database} Container {container}.", - x.Mode, endpointUri, x.Database, x.Container) - Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", - (let t = x.Timeout in t.TotalSeconds), x.Retries, x.MaxRetryWaitTime) - let connector = Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) - discovery, connector, { database = x.Database; container = x.Container } - [] type Parameters = - (* ChangeFeed Args*) - | [] ConsumerGroupName of string - | [] LeaseContainerSuffix of string - | [] FromTail - | [] MaxDocuments of int - | [] MaxPendingBatches of int - | [] MaxWriters of int - | [] LagFreqM of float - | [] Verbose - | [] ChangeFeedVerbose - (* Kafka Args *) - | [] Broker of string - | [] Topic of string - | [] Producers of int - (* Cosmos Source Args *) - | [] Cosmos of ParseResults + | [] ConsumerGroupName of string + | [] MaxReadAhead of int + | [] MaxWriters of int + | [] Verbose + | [] VerboseConsole + | [] SrcCosmos of ParseResults interface IArgParserTemplate with member a.Usage = match a with | ConsumerGroupName _ -> "Projector consumer group name." - | LeaseContainerSuffix _ -> "specify Container Name suffix for Leases container (default: `-aux`)." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." - | MaxDocuments _ -> "maximum document count to supply for the Change Feed query. Default: use response size limit" - | MaxPendingBatches _ -> "maximum number of batches to let processing get ahead of completion. Default: 64" - | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 1024" - | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off" - | Verbose -> "request Verbose Logging. Default: off" - | ChangeFeedVerbose -> "request Verbose Logging from ChangeFeedProcessor. Default: off" - | Broker _ -> "specify Kafka Broker, in host:port format. (default: use environment variable PROPULSION_KAFKA_BROKER, if specified)" - | Topic _ -> "specify Kafka Topic Id. (default: use environment variable PROPULSION_KAFKA_TOPIC, if specified)" - | Producers _ -> "specify number of Kafka Producer instances to use. Default: 1" - | Cosmos _ -> "specify CosmosDb input parameters" - and Arguments(args : ParseResults) = - member val Cosmos = Cosmos.Arguments(args.GetResult Cosmos) - member val Target = TargetInfo args - member __.LeaseId = args.GetResult ConsumerGroupName - member __.Suffix = args.GetResult(LeaseContainerSuffix,"-aux") - member __.Verbose = args.Contains Verbose - member __.ChangeFeedVerbose = args.Contains ChangeFeedVerbose - member __.MaxDocuments = args.TryGetResult MaxDocuments - member __.MaxReadAhead = args.GetResult(MaxPendingBatches,64) - member __.ConcurrentStreamProcessors = args.GetResult(MaxWriters,1024) - member __.LagFrequency = args.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes - member __.AuxContainerName = __.Cosmos.Container + __.Suffix - member x.BuildChangeFeedParams() = - match x.MaxDocuments with - | None -> - Log.Information("Processing {leaseId} in {auxContainerName} without document count limit (<= {maxPending} pending) using {dop} processors", - x.LeaseId, x.AuxContainerName, x.MaxReadAhead, x.ConcurrentStreamProcessors) - | Some lim -> - Log.Information("Processing {leaseId} in {auxContainerName} with max {changeFeedMaxDocuments} documents (<= {maxPending} pending) using {dop} processors", - x.LeaseId, x.AuxContainerName, lim, x.MaxReadAhead, x.ConcurrentStreamProcessors) - if args.Contains FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") - x.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) - { database = x.Cosmos.Database; container = x.AuxContainerName}, x.LeaseId, args.Contains FromTail, x.MaxDocuments, x.LagFrequency, - (x.MaxReadAhead, x.ConcurrentStreamProcessors) - and TargetInfo(args : ParseResults) = - member __.Broker = Uri(match args.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "PROPULSION_KAFKA_BROKER") - member __.Topic = match args.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "PROPULSION_KAFKA_TOPIC" - member __.Producers = args.GetResult(Producers,1) - member x.BuildTargetParams() = x.Broker, x.Topic, x.Producers + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 64." + | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 1024." + | Verbose -> "request Verbose Logging. Default: off." + | VerboseConsole -> "request Verbose Console Logging. Default: off." + | SrcCosmos _ -> "specify CosmosDB input parameters." + and Arguments(a : ParseResults) = + member __.ConsumerGroupName = a.GetResult ConsumerGroupName + member __.Verbose = a.Contains Parameters.Verbose + member __.VerboseConsole = a.Contains VerboseConsole + member __.MaxReadAhead = a.GetResult(MaxReadAhead,64) + member __.MaxConcurrentStreams = a.GetResult(MaxWriters,1024) + member __.StatsInterval = TimeSpan.FromMinutes 1. + member val Source : CosmosSourceArguments = + match a.TryGetSubCommand() with + | Some (SrcCosmos cosmos) -> CosmosSourceArguments cosmos + | _ -> raise (MissingArg "Must specify one of cosmos for Src") + member x.SourceParams() = + let srcC = x.Source + let disco, auxColl = + match srcC.LeaseContainer with + | None -> srcC.Discovery, { database = srcC.Database; container = srcC.Container + "-aux" } + | Some sc -> srcC.Discovery, { database = srcC.Database; container = sc } + Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead) + Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}", + x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments) + if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") + srcC.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) + srcC,(disco, auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency) + and [] CosmosSourceParameters = + | [] FromTail + | [] MaxDocuments of int + | [] LagFreqM of float + | [] LeaseContainer of string + + | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string // Actually Mandatory, but stating that is not supported + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of int + + | [] Kafka of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited." + | LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off." + | LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`." + + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. Default: envvar:EQUINOX_COSMOS_CONNECTION." + | Database _ -> "specify a database name for Cosmos account. Default: envvar:EQUINOX_COSMOS_DATABASE." + | Container _ -> "specify a container name within `Database`." + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + + | Kafka _ -> "Kafka Sink parameters." + and CosmosSourceArguments(a : ParseResults) = + member __.FromTail = a.Contains CosmosSourceParameters.FromTail + member __.MaxDocuments = a.TryGetResult MaxDocuments + member __.LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes + member __.LeaseContainer = a.TryGetResult CosmosSourceParameters.LeaseContainer + + member __.Mode = a.GetResult(CosmosSourceParameters.ConnectionMode, Equinox.Cosmos.ConnectionMode.Direct) + member __.Discovery = Discovery.FromConnectionString __.Connection + member __.Connection = match a.TryGetResult CosmosSourceParameters.Connection with Some x -> x | None -> envBackstop "Connection" "EQUINOX_COSMOS_CONNECTION" + member __.Database = match a.TryGetResult CosmosSourceParameters.Database with Some x -> x | None -> envBackstop "Database" "EQUINOX_COSMOS_DATABASE" + member __.Container = a.GetResult CosmosSourceParameters.Container + member __.Timeout = a.GetResult(CosmosSourceParameters.Timeout, 5.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(CosmosSourceParameters.Retries, 1) + member __.MaxRetryWaitTime = a.GetResult(CosmosSourceParameters.RetriesWaitTime, 5) + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri,_)) as discovery = x.Discovery + Log.Information("Source CosmosDb {mode} {endpointUri} Database {database} Container {container}", + x.Mode, endpointUri, x.Database, x.Container) + Log.Information("Source CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", + (let t = x.Timeout in t.TotalSeconds), x.Retries, x.MaxRetryWaitTime) + let connector = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) + discovery, { database = x.Database; container = x.Container }, connector + member val Sink = + match a.TryGetSubCommand() with + | Some (CosmosSourceParameters.Kafka cosmos) -> KafkaSinkArguments cosmos + | _ -> raise (MissingArg "Must specify `kafka` arguments") + and [] KafkaSinkParameters = + | [] Broker of string + | [] Topic of string + interface IArgParserTemplate with + member a.Usage = a |> function + | Broker _ -> "specify Kafka Broker, in host:port format. Default: use environment variable PROPULSION_KAFKA_BROKER." + | Topic _ -> "specify Kafka Topic Id. Default: use environment variable PROPULSION_KAFKA_TOPIC" + and KafkaSinkArguments(a : ParseResults) = + member __.Broker = Uri(match a.TryGetResult Broker with Some x -> x | None -> envBackstop "Broker" "PROPULSION_KAFKA_BROKER") + member __.Topic = match a.TryGetResult Topic with Some x -> x | None -> envBackstop "Topic" "PROPULSION_KAFKA_TOPIC" + member x.BuildTargetParams() = x.Broker, x.Topic /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse argv = @@ -139,47 +146,41 @@ module Logging = |> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId,2} {Message:lj} {NewLine}{Exception}" c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) |> fun c -> c.CreateLogger() - -let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = - docs - |> Seq.collect EquinoxCosmosParser.enumStreamEvents + Log.Logger let [] appName = "ProjectorTemplate" let start (args : CmdParser.Arguments) = - Logging.initialize args.Verbose args.ChangeFeedVerbose - let (broker,topic, producers) = args.Target.BuildTargetParams() - let producer = Propulsion.Kafka.Producer(Log.Logger, appName, broker, topic, degreeOfParallelism = producers) + let log = Logging.initialize args.Verbose args.VerboseConsole + let (srcC,(auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) = args.SourceParams() + let (discovery,cosmos,connector),(broker,topic) = + srcC.BuildConnectionDetails(),srcC.Sink.BuildTargetParams() + let producer = Propulsion.Kafka.Producer(Log.Logger, appName, broker, topic) let produce (x : Propulsion.Codec.NewtonsoftJson.RenderedSummary) = producer.ProduceAsync(x.s, Propulsion.Codec.NewtonsoftJson.Serdes.Serialize x) - - let discovery, connector, source = args.Cosmos.BuildConnectionDetails() - let handleStreamEvents = - let cache = Caching.Cache (appName, 10) - let connection = Async.RunSynchronously <| connector.Connect(appName, discovery) - let context = Context(connection, source.database, source.container) + let cache = Equinox.Cosmos.Caching.Cache(appName, sizeMb = 10) + let connection = connector.Connect(appName, discovery) |> Async.RunSynchronously + let context = Equinox.Cosmos.Context(connection, cosmos.database, cosmos.container) + let handleStreamEvents : (string*Propulsion.Streams.StreamSpan<_>) -> Async = let service = Todo.Repository.createService cache context Producer.handleAccumulatedEvents service produce - - let aux, leaseId, startFromTail, maxDocuments, lagFrequency, (maxReadAhead, maxConcurrentStreams) = args.BuildChangeFeedParams() - let projector = - Propulsion.Streams.Sync.StreamsSync.Start( - Log.Logger, maxReadAhead, maxConcurrentStreams, handleStreamEvents, category, + let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = + docs |> Seq.collect EquinoxCosmosParser.enumStreamEvents + let sink = + Propulsion.Streams.Sync.StreamsSync.Start( + log, args.MaxReadAhead, args.MaxConcurrentStreams, handleStreamEvents, category, statsInterval=TimeSpan.FromMinutes 1., dumpExternalStats=producer.DumpStats) - let createObserver () = CosmosSource.CreateObserver(Log.Logger, projector.StartIngester, mapToStreamItems) - let runSourcePipeline = - CosmosSource.Run( - Log.Logger, discovery, connector.ClientOptions, source, - aux, leaseId, startFromTail, createObserver, - ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency) - runSourcePipeline, projector + let createObserver () = CosmosSource.CreateObserver(log, sink.StartIngester, mapToStreamItems) + sink,CosmosSource.Run(log, discovery, connector.ClientOptions, cosmos, + aux, leaseId, startFromTail, createObserver, + ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency, auxDiscovery=auxDiscovery) [] let main argv = try try let args = CmdParser.parse argv - let runSourcePipeline, projector = start args - Async.Start <| runSourcePipeline - Async.RunSynchronously <| projector.AwaitCompletion() + let projector,runSourcePipeline = start args + runSourcePipeline |> Async.Start + projector.AwaitCompletion() |> Async.RunSynchronously if projector.RanToCompletion then 0 else 2 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | CmdParser.MissingArg msg -> eprintfn "%s" msg; 1 diff --git a/propulsion-summary-projector/SummaryProjector.fsproj b/propulsion-summary-projector/SummaryProjector.fsproj index 94849beb2..a89867f53 100644 --- a/propulsion-summary-projector/SummaryProjector.fsproj +++ b/propulsion-summary-projector/SummaryProjector.fsproj @@ -23,4 +23,4 @@ - + \ No newline at end of file