Skip to content

Commit

Permalink
Target Propulsion 2.11.0 (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Oct 19, 2021
1 parent cc59f41 commit b3a6991
Show file tree
Hide file tree
Showing 46 changed files with 132 additions and 214 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

- Added defaulting of 1 min for lag reporting frequency to all Cosmos consumers [#95](https://github.com/jet/dotnet-templates/pull/95)
- `eqxPatterns`: replace best effort deduplication pattern with exactly-once ingestion [#94](https://github.com/jet/dotnet-templates/pull/94)
- Target `Propulsion` v `2.11.0`, `Equinox` v `3.0.4` [#101](https://github.com/jet/dotnet-templates/pull/101)
- Target `Destructurama.FSharp` v `1.2.0`, `Serilog.Sinks.Async` v `1.5.0`, `Serilog.Sinks.Console` v `4.0.0` [#101](https://github.com/jet/dotnet-templates/pull/101)

### Removed
### Fixed
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The following templates focus specifically on the usage of `Propulsion` componen
- [`proConsumer`](propulsion-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https://github.com/jet/propulsion) (typically consuming from an app produced with `dotnet new proProjector -k`).

- [`periodicIngester`](periodic-ingester/) - Boilerplate for a service that regularly walks the content of a source, feeding it into a propulsion projector in order to manage the ingestion process using [`Propulsion.Feed.PeriodicSource`](https://github.com/jet/propulsion)
-

## Producer/Reactor Templates combining usage of Equinox and Propulsion

The bulk of the remaining templates have a consumer aspect, and hence involve usage of `Propulsion`.
Expand Down Expand Up @@ -326,7 +326,7 @@ The `run` function formalizes the overall pattern. It is responsible for:
```
let run args = async {
use consumer = start args
return! consumer.AwaitCompletion()
return! consumer.AwaitWithStopOnCancellation()
}
[<EntryPoint>]
Expand Down
6 changes: 3 additions & 3 deletions equinox-patterns/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.1" />
</ItemGroup>

</Project>
1 change: 0 additions & 1 deletion equinox-patterns/Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ module ListEpochId =

/// Identifies an Item stored within an Epoch
/// TODO replace `Item` with a Domain term referencing the specific element being managed
type ItemId = string<itemId>
and [<Measure>] itemId
module ItemId =
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />

<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="FsCheck.Xunit" Version="2.14.2" />
<PackageReference Include="unquote" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type MemoryStoreProjector<'F, 'B> private (log, inner : Propulsion.ProjectorPipe
Observable.subscribe this.Submit source

/// Waits until all <c>Submit</c>ted batches have been fed into the <c>inner</c> Projector
member _.AwaitCompletion
member _.AwaitWithStopOnCancellation
( /// sleep time while awaiting completion
?delay,
/// interval at which to log progress of Projector loop
Expand All @@ -72,7 +72,7 @@ type MemoryStoreProjector<'F, 'B> private (log, inner : Propulsion.ProjectorPipe
inner.Stop()
// trigger termination of GetConsumingEnumerable()-driven pumping loop
queue.CompleteAdding()
return! inner.AwaitCompletion()
return! inner.AwaitWithStopOnCancellation()
}

type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />

<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="FsCheck.Xunit" Version="2.14.2" />
<PackageReference Include="unquote" Version="5.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type WatchdogIntegrationTests(output) =
with :? TimeoutException -> timeouts <- timeouts + 1

log.Information("Awaiting batches: {counts} ({timeouts}/{total} timeouts)", counts, timeouts, counts.Count)
do! projector.AwaitCompletion()
do! projector.AwaitWithStopOnCancellation()
stats.DumpStats()
}

Expand Down
12 changes: 1 addition & 11 deletions equinox-shipping/Watchdog/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,14 @@ module CosmosStoreContext =
let maxEvents = 256
Equinox.CosmosStore.CosmosStoreContext(storeClient, tipMaxEvents=maxEvents)

[<System.Runtime.CompilerServices.Extension>]
type LoggerConfigurationExtensions() =

[<System.Runtime.CompilerServices.Extension>]
static member inline ConfigureChangeFeedProcessorLogging(c : LoggerConfiguration, verbose : bool) =
let cfpl = if verbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
// TODO figure out what CFP v3 requires
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)

[<System.Runtime.CompilerServices.Extension>]
type Logging() =

[<System.Runtime.CompilerServices.Extension>]
static member Configure(configuration : LoggerConfiguration, ?verbose, ?changeFeedProcessorVerbose) =
static member Configure(configuration : LoggerConfiguration, ?verbose) =
configuration
.Destructure.FSharpTypes()
.Enrich.FromLogContext()
|> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c
|> fun c -> c.ConfigureChangeFeedProcessorLogging(verbose = (changeFeedProcessorVerbose = Some true))
|> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}"
c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)
9 changes: 3 additions & 6 deletions equinox-shipping/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ module Args =
member val ProcessingTimeout = a.GetResult(TimeoutS, 10.) |> TimeSpan.FromSeconds
member val Cosmos = CosmosArguments(c, a.GetResult Cosmos)
and [<NoEquality; NoComparison>] CosmosParameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-mi"; Unique>] MaxItems of int
| [<AltCommandLine "-l"; Unique>] LagFreqM of float
Expand All @@ -66,7 +65,6 @@ module Args =
| [<AltCommandLine "-rt">] RetriesWaitTime of float
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "request Verbose Change Feed Processor Logging. Default: off."
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxItems _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
Expand All @@ -92,7 +90,6 @@ module Args =
let fromTail = a.Contains FromTail
let maxItems = a.TryGetResult MaxItems
let lagFrequency = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val Verbose = a.Contains Verbose
member private _.ConnectLeases() = connector.CreateUninitialized(database, leaseContainerId)
member x.MonitoringParams() =
let leases : Microsoft.Azure.Cosmos.Container = x.ConnectLeases()
Expand Down Expand Up @@ -128,19 +125,19 @@ let build (args : Args.Arguments) =
let pipeline =
use observer = CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect Handler.transformOrFilter)
let leases, startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams()
CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxItems, lagReportFreq=lagFrequency)
CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
sink, pipeline

let run args = async {
let sink, pipeline = build args
pipeline |> Async.Start
return! sink.AwaitCompletion()
return! sink.AwaitWithStopOnCancellation()
}

[<EntryPoint>]
let main argv =
try let args = Args.parse EnvVar.tryGet argv
try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose, changeFeedProcessorVerbose=args.Cosmos.Verbose).CreateLogger()
try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).CreateLogger()
try run args |> Async.RunSynchronously; 0
with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
Expand Down
8 changes: 4 additions & 4 deletions equinox-shipping/Watchdog/Watchdog.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00035" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0-rc2" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Domain\Domain.fsproj" />
</ItemGroup>
</Project>
</Project>
2 changes: 1 addition & 1 deletion equinox-testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ This project was generated using:
The commandline `dotnet run -p Testbed -- run cosmos -help` shows the various connection options available when using CosmosDb. Running without any arguments will use the environment variables as above. While it's possible to run against the CosmosDb simulator, there's not much to be learned over what one can learn from using `Equinox.MemoryStore`. In general, if the transactions are small and you don't saturate the network bandwidth, you should be able to achieve 100s of rps even if you're not in the same Region. Running in the same data center as the CosmosDb instance, 1000s of rps should be attainable.

# Run with caching and unfolds against CosmosDb, 100 rps
dotnet run -- run -C - U -d 2 -f 100 cosmos
dotnet run -- run -CU -d 2 -f 100 cosmos
14 changes: 7 additions & 7 deletions equinox-testbed/Testbed.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00035" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.EventStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="3.0.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="Equinox.EventStore" Version="3.0.4" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="Equinox.Tools.TestHarness" Version="3.0.4" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.3" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>

<ItemGroup>
<Content Include="README.md" />
</ItemGroup>

</Project>
</Project>
4 changes: 2 additions & 2 deletions equinox-web-csharp/Domain/Domain.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="3.0.1" />
<PackageReference Include="Equinox" Version="3.0.4" />
<PackageReference Include="FsCodec" Version="2.0.1" />
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
</ItemGroup>

</Project>
</Project>
12 changes: 6 additions & 6 deletions equinox-web-csharp/Web/Web.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.1" />
<PackageReference Include="Equinox.EventStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.4" />
<PackageReference Include="Equinox.EventStore" Version="3.0.4" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Serilog.AspNetCore" Version="3.2.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<!-- until FsCodec.SystemTextJson is available -->
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="3.1.3" />
</ItemGroup>
Expand All @@ -20,4 +20,4 @@
<ProjectReference Include="../Domain/Domain.csproj" />
</ItemGroup>

</Project>
</Project>
4 changes: 2 additions & 2 deletions equinox-web/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox" Version="3.0.1" />
<PackageReference Include="Equinox" Version="3.0.4" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.1" />
</ItemGroup>

</Project>
</Project>
3 changes: 1 addition & 2 deletions equinox-web/Web/Startup.fs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ type Startup() =
.SetCompatibilityVersion(CompatibilityVersion.Latest)
// While FsCodec.SystemTextJson is available and works well, until FsCodec.SystemTextJson has a UnionConverter https://github.com/jet/FsCodec/pull/59, use JSON.NET
.AddNewtonsoftJson(fun options ->
/// TODO When 2.2.2 released, use Settings.DefaultSettings
FsCodec.NewtonsoftJson.Settings.CreateDefault().Converters
FsCodec.NewtonsoftJson.Serdes.DefaultSettings.Converters
|> Seq.iter options.SerializerSettings.Converters.Add
)|> ignore

Expand Down
16 changes: 9 additions & 7 deletions equinox-web/Web/Web.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.1" />
<PackageReference Include="Equinox.EventStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.4" />
<PackageReference Include="Equinox.EventStore" Version="3.0.4" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
<PackageReference Include="FsCodec.Box" Version="2.2.2" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Serilog.AspNetCore" Version="3.2.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<!-- until FsCodec.SystemTextJson is complete -->
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<!-- until FsCodec.SystemTextJson is feature complete see https://github.com/jet/FsCodec/pull/59 -->
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.2.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="3.1.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Domain\Domain.fsproj" />
</ItemGroup>

</Project>
</Project>
10 changes: 5 additions & 5 deletions feed-consumer/FeedConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00035" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0-rc2" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0-rc2" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion feed-consumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ module ConsumerExt =
member sink.AwaitWithStopOnCancellation() = async {
let! ct = Async.CancellationToken
use _ = ct.Register(fun () -> sink.Stop())
return! sink.AwaitCompletion()
return! sink.AwaitWithStopOnCancellation()
}

let run args = async {
Expand Down
4 changes: 2 additions & 2 deletions feed-source/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="3.0.1" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.1" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.4" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.4" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions feed-source/FeedApi/FeedApi.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00035" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Serilog.AspNetCore" Version="3.2.0" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions periodic-ingester/PeriodicIngester.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.4" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0-rc3" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0-rc3" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
12 changes: 6 additions & 6 deletions propulsion-archiver/Archiver.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00035" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.1" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0-rc2" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.4" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.4.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>
</Project>
</Project>
Loading

0 comments on commit b3a6991

Please sign in to comment.