diff --git a/infrastructure/build.all.sh b/infrastructure/build.all.sh
old mode 100644
new mode 100755
diff --git a/infrastructure/mssql-init-container/build.sh b/infrastructure/mssql-init-container/build.sh
old mode 100644
new mode 100755
diff --git a/infrastructure/mssql/build.sh b/infrastructure/mssql/build.sh
old mode 100644
new mode 100755
diff --git a/infrastructure/mssql/deploy.k8s.sh b/infrastructure/mssql/deploy.k8s.sh
old mode 100644
new mode 100755
diff --git a/infrastructure/mssql/src/Dockerfile b/infrastructure/mssql/src/Dockerfile
index 69958257..4c933f21 100644
--- a/infrastructure/mssql/src/Dockerfile
+++ b/infrastructure/mssql/src/Dockerfile
@@ -1,4 +1,4 @@
-FROM mcr.microsoft.com/mssql/server:2019-latest
+FROM mcr.microsoft.com/mssql/server:2022-latest
COPY ./setup.sql .
COPY ./setup.sh .
diff --git a/infrastructure/mssql/src/ready-check.sh b/infrastructure/mssql/src/ready-check.sh
old mode 100644
new mode 100755
diff --git a/infrastructure/mssql/src/setup.sh b/infrastructure/mssql/src/setup.sh
old mode 100644
new mode 100755
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 0924ffc9..cd1da7cd 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -19,7 +19,7 @@
1.5.20
1.4.1
1.5.20
- [7.0.0,)
+ [8.0.0,)
diff --git a/src/clustering/sharding-sqlserver/README.md b/src/clustering/sharding-sqlserver/README.md
index 234e9e72..25dbfc77 100644
--- a/src/clustering/sharding-sqlserver/README.md
+++ b/src/clustering/sharding-sqlserver/README.md
@@ -21,8 +21,8 @@ Like all of the samples in this repository, we are using a simple domain since t
This app consists of two Akka.Cluster role types:
-1. `SqlSharding.WebApp` - a web-application that joins the cluster and queries actors hosted on the `SqlSharding.Host` instances via Akka.Cluster.Sharding and Akka.Cluster.Singleton and
-2. `SqlSharding.Host` - a [headless Akka.NET process](https://petabridge.com/blog/akkadotnet-ihostedservice/) that hosts the `ShardRegion` for all `ProductTotalsActor` instances and a `ClusterSingletonManager` for the singular `ProductIndexActor` instance.
+1. `SqlSharding.WebApp` - a web-application that joins the cluster and queries actors hosted on the `SqlSharding.Sql.Host` instances via Akka.Cluster.Sharding and Akka.Cluster.Singleton and
+2. `SqlSharding.Sql.Host` - a [headless Akka.NET process](https://petabridge.com/blog/akkadotnet-ihostedservice/) that hosts the `ShardRegion` for all `ProductTotalsActor` instances and a `ClusterSingletonManager` for the singular `ProductIndexActor` instance.
We have only two actor types in this solution:
@@ -60,8 +60,8 @@ Implementing this with actors instead gives us the following:
Here are the key details for understanding what this sample does and how it works:
-1. **Akka.Cluster.Sharding does all of the heavy lifting** - it guarantees that for every unique `productID` there exists exactly one `ProductTotalsActor` that owns the state of that entity. That guarantee is upheld even across network splits. One `ProductTotalsActor` can be moved from one `SqlSharding.Host` node to another, message traffic to that actor will be paused while this happens, the actor will be recreated on its new home node, and message traffic will be resumed. The `WebApp` and `ProductIndexActor` instances have no idea that this is happening *and they don't need to*. In addition to that, the `ShardRegion` hosting the `ProductIndexActor` will dynamically create new instances of those actors on-demand and will, by default, kill existing instances of those actors if they haven't been sent a message for more than 120 seconds. You can change how the "kill off actors" behavior works, however: `akka.cluster.sharding.remember-entities=off` turns this off and keeps entity actors alive forever, or you can just change the time threshold via `akka.cluster.sharding.passivate-idle-entity-after = 400s`.
-2. **`ShardRegionProxy` is how `SqlSharding.WebApp` communicates with `ProductTotalsActor`s hosted on the `SqlSharding.Host` instances** - again, the Akka.Cluster.Sharding infrastructure does most of the heavy work here; but this time from the perspective of the WebApp - all of the messages sent to the `/product/{productId}` route are ultimately routed to a `ShardRegionProxy` `IActorRef`. This actor knows how to communicate with the `ShardRegion` on the `SqlSharding.Host` role to dynamically instantiate a new `ProductTotalsActor` instance, if necessary, and route messages to it.
+1. **Akka.Cluster.Sharding does all of the heavy lifting** - it guarantees that for every unique `productID` there exists exactly one `ProductTotalsActor` that owns the state of that entity. That guarantee is upheld even across network splits. One `ProductTotalsActor` can be moved from one `SqlSharding.Sql.Host` node to another, message traffic to that actor will be paused while this happens, the actor will be recreated on its new home node, and message traffic will be resumed. The `WebApp` and `ProductIndexActor` instances have no idea that this is happening *and they don't need to*. In addition to that, the `ShardRegion` hosting the `ProductIndexActor` will dynamically create new instances of those actors on-demand and will, by default, kill existing instances of those actors if they haven't been sent a message for more than 120 seconds. You can change how the "kill off actors" behavior works, however: `akka.cluster.sharding.remember-entities=off` turns this off and keeps entity actors alive forever, or you can just change the time threshold via `akka.cluster.sharding.passivate-idle-entity-after = 400s`.
+2. **`ShardRegionProxy` is how `SqlSharding.WebApp` communicates with `ProductTotalsActor`s hosted on the `SqlSharding.Sql.Host` instances** - again, the Akka.Cluster.Sharding infrastructure does most of the heavy work here; but this time from the perspective of the WebApp - all of the messages sent to the `/product/{productId}` route are ultimately routed to a `ShardRegionProxy` `IActorRef`. This actor knows how to communicate with the `ShardRegion` on the `SqlSharding.Sql.Host` role to dynamically instantiate a new `ProductTotalsActor` instance, if necessary, and route messages to it.
3. **Akka.Persistence is what we use to manage entity state, and we use event-sourcing to provide long-term extensibility** - the separation between commands, events, and queries is an important feature of how this application is designed. Cleanly segmenting the "message grammar" helps keep things organized ultimately simplifies the stateful piece of our programming model: the state of any given entity must be derived from the sum of its events. We use Akka.Persistence.SqlServer to store / recover this data and we use Google.Protobuf to serialize events, state, commands, and queries in a highly versionable way.
## Running Sample
@@ -88,9 +88,12 @@ This will build a copy of our [MSSQL image](https://github.com/petabridge/akkado
### Run the Sample
+On initial run, the database can be seeded by setting the `SEED_DB=true` environment variable
+
Load up Rider or Visual Studio and
-1. Launch `SqlSharding.Host`, followed by
+
+1. Launch `SqlSharding.Sql.Host`, followed by
2. Launch `SqlSharding.WebApp`.
-Provided that you don't see any SQL Server connection errors originating from `SqlSharding.Host` - you should have no trouble using the WebApp's UI to add products, submit orders, change inventory levels, and more.
+Provided that you don't see any SQL Server connection errors originating from `SqlSharding.Sql.Host` - you should have no trouble using the WebApp's UI to add products, submit orders, change inventory levels, and more.
diff --git a/src/clustering/sharding-sqlserver/ShardingSqlServer.sln b/src/clustering/sharding-sqlserver/ShardingSqlServer.sln
index c6ce71b8..0b113076 100644
--- a/src/clustering/sharding-sqlserver/ShardingSqlServer.sln
+++ b/src/clustering/sharding-sqlserver/ShardingSqlServer.sln
@@ -5,8 +5,6 @@ VisualStudioVersion = 16.0.30114.105
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlSharding.Shared", "SqlSharding.Shared\SqlSharding.Shared.csproj", "{52F92E8D-0E25-41DB-88C7-DB537166E97D}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlSharding.Host", "SqlSharding.Host\SqlSharding.Host.csproj", "{A8FF833D-47A2-4A7B-97D8-FBE56C973181}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlSharding.WebApp", "SqlSharding.WebApp\SqlSharding.WebApp.csproj", "{2DE443CB-EEA0-4A69-9750-C55638778565}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlSharding.Sql.Host", "SqlSharding.Sql.Host\SqlSharding.Sql.Host.csproj", "{915E613A-A6ED-4633-BC50-832A15CF4596}"
@@ -24,10 +22,6 @@ Global
{52F92E8D-0E25-41DB-88C7-DB537166E97D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{52F92E8D-0E25-41DB-88C7-DB537166E97D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{52F92E8D-0E25-41DB-88C7-DB537166E97D}.Release|Any CPU.Build.0 = Release|Any CPU
- {A8FF833D-47A2-4A7B-97D8-FBE56C973181}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {A8FF833D-47A2-4A7B-97D8-FBE56C973181}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {A8FF833D-47A2-4A7B-97D8-FBE56C973181}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {A8FF833D-47A2-4A7B-97D8-FBE56C973181}.Release|Any CPU.Build.0 = Release|Any CPU
{2DE443CB-EEA0-4A69-9750-C55638778565}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2DE443CB-EEA0-4A69-9750-C55638778565}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2DE443CB-EEA0-4A69-9750-C55638778565}.Release|Any CPU.ActiveCfg = Release|Any CPU
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductIndexActor.cs b/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductIndexActor.cs
deleted file mode 100644
index 2374b7b0..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductIndexActor.cs
+++ /dev/null
@@ -1,172 +0,0 @@
-using System.Collections.Immutable;
-using Akka.Actor;
-using Akka.Delivery;
-using Akka.Event;
-using Akka.Hosting;
-using Akka.Persistence.Query;
-using Akka.Persistence.Query.Sql;
-using Akka.Streams;
-using Akka.Streams.Dsl;
-using Akka.Util;
-using SqlSharding.Shared.Queries;
-using SqlSharding.Shared.Sharding;
-using FetchAllProductsResponse = SqlSharding.Shared.Queries.FetchAllProductsResponse;
-using FetchProduct = SqlSharding.Shared.Queries.FetchProduct;
-using ProductData = SqlSharding.Shared.ProductData;
-
-namespace SqlSharding.Host.Actors;
-
-///
-/// Uses Akka.Persistence.Query to index all actively maintained .
-///
-public sealed class ProductIndexActor : ReceiveActor, IWithTimers
-{
- private readonly ILoggingAdapter _logging = Context.GetLogger();
- private const int MaxAttempts = 3;
- private readonly IActorRef _shardRegion;
- private ImmutableDictionary _productIds = ImmutableDictionary.Empty;
- private ImmutableDictionary _producers = ImmutableDictionary.Empty;
-
- private record ConsumerTerminated(string ProducerId);
-
- private record RetryFetchAllProducts(FetchAllProductsImpl OriginalRequest, int Attempts);
-
- public ProductIndexActor(IRequiredActor requiredActor)
- {
- _shardRegion = requiredActor.ActorRef;
- Receive(found =>
- {
- _logging.Info("Found product [{0}]", found);
- _productIds = _productIds.Add(found.ProductId, ProductData.Empty);
- _shardRegion.Tell(new FetchProduct(found.ProductId));
- });
-
- Receive(result =>
- {
- _logging.Info("Received product state for product [{0}]", result.State.ProductId);
- _productIds = _productIds.SetItem(result.State.ProductId, result.State.Data);
- });
-
- Receive(f => { ReceiveFetchProducts(f, 1); });
-
- Receive>(next =>
- {
- // signal that demand is available
- _producers = _producers.SetItem(next.ProducerId, (_producers[next.ProducerId].producerController, true));
- });
-
- Receive(r =>
- {
- ReceiveFetchProducts(r.OriginalRequest, r.Attempts);
- });
-
- Receive(t =>
- {
- if (_producers.TryGetValue(t.ProducerId, out var p))
- {
- Context.Stop(p.producerController);
- _producers = _producers.Remove(t.ProducerId);
- }
- });
-
- Receive(_ =>
- {
- // this should never happen
- throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM");
- });
-
- Receive(f =>
- {
- _logging.Error(f.Cause, "Failed to read product from Akka.Persistence.Query");
- throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM");
- });
- }
-
- private void ReceiveFetchProducts(FetchAllProductsImpl f, int attemptNo)
- {
- if (_producers.TryGetValue(f.ProducerId, out var t))
- {
- if (t.hasDemand)
- {
- // let the ProducerController perform sequencing
- t.producerController.Tell(new FetchAllProductsResponse(_productIds.Values.ToList()));
-
- // signal that demand has been met (for now)
- _producers = _producers.SetItem(f.ProducerId, (t.producerController, false));
- Timers.Cancel(f.ProducerId); // if there was a scheduled retry pending - client beat us to it
- }
- else
- {
- // retry in 1 second
- RetryRequest(f, attemptNo + 1);
- }
- }
- else
- {
- // need to start ProducerController
- var producerController = CreateProducerController(f.ProducerId, f.ConsumerController);
- _producers = _producers.Add(f.ProducerId, (producerController, false));
- RetryRequest(f, attemptNo + 1);
- }
- }
-
- private void RetryRequest(FetchAllProductsImpl f, int currentAttempt)
- {
- if (currentAttempt > MaxAttempts)
- {
- // request failed
- _logging.Error("Failed to process FetchAllProducts for [{0}] afters [{1}] attempts", f.ProducerId, MaxAttempts);
- }
- else
- {
- Timers.StartSingleTimer(f.ProducerId, new RetryFetchAllProducts(f, currentAttempt), TimeSpan.FromSeconds(1));
- }
- }
-
- private readonly record struct ProductFound(string ProductId);
-
- private sealed class Done
- {
- public static readonly Done Instance = new();
-
- private Done()
- {
- }
- }
-
- protected override void PreStart()
- {
- /*
- * Kicks off an Akka.Persistence.Query instance that will continuously
- */
- var query = Context.System.ReadJournalFor(SqlReadJournal.Identifier);
- query.PersistenceIds()
- .Where(c => c.StartsWith(ProductTotalsActor.TotalsEntityNameConstant))
- .Select(c =>
- {
- var splitPivot = c.IndexOf("-", StringComparison.Ordinal);
- return new ProductFound(c[(splitPivot + 1)..]);
- })
- .To(Sink.ActorRef(Self, Done.Instance, ex => new Status.Failure(ex)))
- .Run(Context.Materializer());
- }
-
- private IActorRef CreateProducerController(string producerId, IActorRef requestor)
- {
- // 64KB chunks
- var producerControllerSettings = ProducerController.Settings.Create(Context.System) with { ChunkLargeMessagesBytes = 1024 };
- var producerControllerProps =
- ProducerController.Create(Context, producerId, Option.None, producerControllerSettings);
- Context.WatchWith(requestor, new ConsumerTerminated(producerId));
- var producerController = Context.ActorOf(producerControllerProps, $"producer-controller-{producerId}");
-
- // register the consumer with the producer
- producerController.Tell(new ProducerController.RegisterConsumer(requestor));
-
- // start production
- producerController.Tell(new ProducerController.Start(Self));
- return producerController;
- }
-
- public ITimerScheduler Timers { get; set; } = null!;
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductTotalsActor.cs b/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductTotalsActor.cs
deleted file mode 100644
index afb9a5b7..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/ProductTotalsActor.cs
+++ /dev/null
@@ -1,111 +0,0 @@
-using Akka.Actor;
-using Akka.Cluster.Sharding;
-using Akka.Event;
-using Akka.Persistence;
-using SqlSharding.Shared;
-using SqlSharding.Shared.Commands;
-using SqlSharding.Shared.Events;
-using SqlSharding.Shared.Queries;
-
-namespace SqlSharding.Host.Actors;
-
-///
-/// Manages the state for a given product.
-///
-public sealed class ProductTotalsActor : ReceivePersistentActor
-{
- public static Props GetProps(string persistenceId)
- {
- return Props.Create(() => new ProductTotalsActor(persistenceId));
- }
-
- ///
- /// Used to help differentiate what type of entity this is inside Akka.Persistence's database
- ///
- public const string TotalsEntityNameConstant = "totals";
-
- private readonly ILoggingAdapter _log = Context.GetLogger();
-
- public ProductTotalsActor(string persistenceId)
- {
- PersistenceId = $"{TotalsEntityNameConstant}-" + persistenceId;
- State = new ProductState();
-
- Recover(offer =>
- {
- if (offer.Snapshot is ProductState state)
- {
- State = state;
- }
- });
-
- Recover(productEvent =>
- {
- State = State.ProcessEvent(productEvent);
- });
-
- Command(cmd =>
- {
- var response = State.ProcessCommand(cmd);
- var sentResponse = false;
-
- if (response.ResponseEvents.Any())
- {
- PersistAll(response.ResponseEvents, productEvent =>
- {
- _log.Info("Processed: {0}", productEvent);
-
- if (productEvent is ProductInventoryWarningEvent warning)
- {
- _log.Warning(warning.ToString());
- }
- State = State.ProcessEvent(productEvent);
-
- if (!sentResponse) // otherwise we'll generate a response-per-event
- {
- sentResponse = true;
-
- async Task ReplyToSender()
- {
- await Task.Delay(1);
- return response;
- }
- var sender = Sender;
- ReplyToSender().PipeTo(sender, failure: ex => new Status.Failure(ex));
-
- }
-
- if(LastSequenceNr % 10 == 0)
- SaveSnapshot(State);
- });
- }
- else
- {
- Sender.Tell(response);
- }
- });
-
-
- Command(success =>
- {
- // purge older snapshots and messages
- DeleteSnapshots(new SnapshotSelectionCriteria(success.Metadata.SequenceNr - 1));
- //DeleteMessages(success.Metadata.SequenceNr);
- });
-
- Command(fetch =>
- {
- Sender.Tell(new FetchResult(State));
-
- if (State.IsEmpty)
- {
- // we don't exist, so don't let `remember-entities` keep us alive
- Context.Parent.Tell(new Passivate(PoisonPill.Instance));
- }
- });
- }
-
- public override string PersistenceId { get; }
-
- public ProductState State { get; set; }
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/SoldProductIndexActor.cs b/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/SoldProductIndexActor.cs
deleted file mode 100644
index 5841c1eb..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/SoldProductIndexActor.cs
+++ /dev/null
@@ -1,174 +0,0 @@
-using System.Collections.Immutable;
-using Akka.Actor;
-using Akka.Delivery;
-using Akka.Event;
-using Akka.Hosting;
-using Akka.Persistence.Query;
-using Akka.Persistence.Query.Sql;
-using Akka.Streams;
-using Akka.Streams.Dsl;
-using Akka.Util;
-using SqlSharding.Shared;
-using SqlSharding.Shared.Events;
-using SqlSharding.Shared.Queries;
-using SqlSharding.Shared.Sharding;
-
-namespace SqlSharding.Host.Actors;
-
-public class SoldProductIndexActor : ReceiveActor, IWithTimers
-{
- private readonly ILoggingAdapter _logging = Context.GetLogger();
- private const int MaxAttempts = 3;
- private readonly IActorRef _shardRegion;
- private ImmutableDictionary _soldProducts = ImmutableDictionary.Empty;
- private ImmutableDictionary _producers = ImmutableDictionary.Empty;
-
- private record ConsumerTerminated(string ProducerId);
-
- private record RetryFetchProducts(FetchSoldProductsImpl OriginalRequest, int Attempts);
-
- public SoldProductIndexActor(IRequiredActor requiredActor)
- {
- _shardRegion = requiredActor.ActorRef;
- Receive(found =>
- {
- _logging.Info("Found sold product [{0}]", found.ProductId);
-
- if (!_soldProducts.TryGetValue(found.ProductId, out var soldData))
- {
- soldData = ProductsSoldData.Empty;
- _shardRegion.Tell(new FetchProduct(found.ProductId));
- }
-
- soldData = soldData with { Invoices = soldData.Invoices.Add(found.Data) };
-
- _soldProducts = _soldProducts.SetItem(found.ProductId, soldData);
- });
-
- Receive(result =>
- {
- var productId = result.State.ProductId;
- _logging.Info("Received product state for sold product [{0}]", productId);
- _soldProducts = _soldProducts.SetItem(productId, _soldProducts[productId] with { ProductData = result.State.Data });
- });
-
- Receive(f => ReceiveFetchSoldProducts(f, 1));
-
- Receive>(next =>
- {
- // signal that demand is available
- _producers = _producers.SetItem(next.ProducerId, (_producers[next.ProducerId].producerController, true));
- });
-
- Receive(r =>
- {
- ReceiveFetchSoldProducts(r.OriginalRequest, r.Attempts);
- });
-
- Receive(t =>
- {
- if (_producers.TryGetValue(t.ProducerId, out var p))
- {
- Context.Stop(p.producerController);
- _producers = _producers.Remove(t.ProducerId);
- }
- });
-
- // this should never happen
- Receive(_ => throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM"));
-
- Receive(f =>
- {
- _logging.Error(f.Cause, "Failed to read sold product from Akka.Persistence.Query");
- throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM");
- });
- }
-
- private void ReceiveFetchSoldProducts(FetchSoldProductsImpl f, int attemptNo)
- {
- if (_producers.TryGetValue(f.ProducerId, out var t))
- {
- if (t.hasDemand)
- {
- // let the ProducerController perform sequencing
- t.producerController.Tell(new FetchSoldProductsResponse(_soldProducts.Values.ToList()));
-
- // signal that demand has been met (for now)
- _producers = _producers.SetItem(f.ProducerId, (t.producerController, false));
- Timers.Cancel(f.ProducerId); // if there was a scheduled retry pending - client beat us to it
- }
- else
- {
- // retry in 1 second
- RetryRequest(f, attemptNo + 1);
- }
- }
- else
- {
- // need to start ProducerController
- var producerController = CreateProducerController(f.ProducerId, f.ConsumerController);
- _producers = _producers.Add(f.ProducerId, (producerController, false));
- RetryRequest(f, attemptNo + 1);
- }
- }
-
- private void RetryRequest(FetchSoldProductsImpl f, int currentAttempt)
- {
- if (currentAttempt > MaxAttempts)
- {
- // request failed
- _logging.Error("Failed to process FetchSoldProducts for [{0}] afters [{1}] attempts", f.ProducerId, MaxAttempts);
- }
- else
- {
- Timers.StartSingleTimer(f.ProducerId, new RetryFetchProducts(f, currentAttempt), TimeSpan.FromSeconds(1));
- }
- }
-
- private readonly record struct SoldProductFound(string ProductId, ProductSold Data);
-
- private sealed class Done
- {
- public static readonly Done Instance = new();
- private Done()
- {
- }
- }
-
- protected override void PreStart()
- {
- /*
- * Kicks off an Akka.Persistence.Query instance that will continuously
- */
- var query = Context.System.ReadJournalFor(SqlReadJournal.Identifier);
- query.EventsByTag(MessageTagger.SoldEventTag, Offset.Sequence(0))
- .Where(e => e.PersistenceId.StartsWith(ProductTotalsActor.TotalsEntityNameConstant))
- .Select(e =>
- {
- var splitPivot = e.PersistenceId.IndexOf("-", StringComparison.Ordinal);
- return new SoldProductFound(e.PersistenceId[(splitPivot + 1)..], (ProductSold)e.Event);
- })
- .To(Sink.ActorRef(Self, Done.Instance, ex => new Status.Failure(ex)))
- .Run(Context.Materializer());
- }
-
- private IActorRef CreateProducerController(string producerId, IActorRef requestor)
- {
- // 64KB chunks
- var producerControllerSettings = ProducerController.Settings.Create(Context.System) with { ChunkLargeMessagesBytes = 1024 };
- var producerControllerProps =
- ProducerController.Create(Context, producerId, Option.None, producerControllerSettings);
- Context.WatchWith(requestor, new ConsumerTerminated(producerId));
- var producerController = Context.ActorOf(producerControllerProps, $"sold-producer-controller-{producerId}");
-
- // register the consumer with the producer
- producerController.Tell(new ProducerController.RegisterConsumer(requestor));
-
- // start production
- producerController.Tell(new ProducerController.Start(Self));
- return producerController;
- }
-
- public ITimerScheduler Timers { get; set; } = null!;
-
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/WarningEventIndexActor.cs b/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/WarningEventIndexActor.cs
deleted file mode 100644
index 23ca8cdd..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Actors/WarningEventIndexActor.cs
+++ /dev/null
@@ -1,174 +0,0 @@
-using System.Collections.Immutable;
-using Akka.Actor;
-using Akka.Delivery;
-using Akka.Event;
-using Akka.Hosting;
-using Akka.Persistence.Query;
-using Akka.Persistence.Query.Sql;
-using Akka.Streams;
-using Akka.Streams.Dsl;
-using Akka.Util;
-using SqlSharding.Shared;
-using SqlSharding.Shared.Events;
-using SqlSharding.Shared.Queries;
-using SqlSharding.Shared.Sharding;
-
-namespace SqlSharding.Host.Actors;
-
-public class WarningEventIndexActor : ReceiveActor, IWithTimers
-{
- private readonly ILoggingAdapter _logging = Context.GetLogger();
- private const int MaxAttempts = 3;
- private readonly IActorRef _shardRegion;
- private ImmutableDictionary _warningEvents = ImmutableDictionary.Empty;
- private ImmutableDictionary _producers = ImmutableDictionary.Empty;
-
- private record ConsumerTerminated(string ProducerId);
-
- private record RetryFetchWarnings(FetchWarningEventsImpl OriginalRequest, int Attempts);
-
- public WarningEventIndexActor(IRequiredActor requiredActor)
- {
- _shardRegion = requiredActor.ActorRef;
- Receive(found =>
- {
- _logging.Info("Found warning event [{0}]", found.ProductId);
-
- if (!_warningEvents.TryGetValue(found.ProductId, out var alertData))
- {
- alertData = WarningEventData.Empty;
- _shardRegion.Tell(new FetchProduct(found.ProductId));
- }
-
- alertData = alertData with { Warnings = alertData.Warnings.Add(found.Data) };
-
- _warningEvents = _warningEvents.SetItem(found.ProductId, alertData);
- });
-
- Receive(result =>
- {
- var productId = result.State.ProductId;
- _logging.Info("Received product state for sold product [{0}]", productId);
- _warningEvents = _warningEvents.SetItem(productId, _warningEvents[productId] with { ProductData = result.State.Data });
- });
-
- Receive(f => ReceiveFetchWarningEvents(f, 1));
-
- Receive>(next =>
- {
- // signal that demand is available
- _producers = _producers.SetItem(next.ProducerId, (_producers[next.ProducerId].producerController, true));
- });
-
- Receive(r =>
- {
- ReceiveFetchWarningEvents(r.OriginalRequest, r.Attempts);
- });
-
- Receive(t =>
- {
- if (_producers.TryGetValue(t.ProducerId, out var p))
- {
- Context.Stop(p.producerController);
- _producers = _producers.Remove(t.ProducerId);
- }
- });
-
- // this should never happen
- Receive(_ => throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM"));
-
- Receive(f =>
- {
- _logging.Error(f.Cause, "Failed to read warning events from Akka.Persistence.Query");
- throw new InvalidOperationException("SHOULD NOT REACH END OF ID STREAM");
- });
- }
-
- private void ReceiveFetchWarningEvents(FetchWarningEventsImpl f, int attemptNo)
- {
- if (_producers.TryGetValue(f.ProducerId, out var t))
- {
- if (t.hasDemand)
- {
- // let the ProducerController perform sequencing
- t.producerController.Tell(new FetchWarningEventsResponse(_warningEvents.Values.ToList()));
-
- // signal that demand has been met (for now)
- _producers = _producers.SetItem(f.ProducerId, (t.producerController, false));
- Timers.Cancel(f.ProducerId); // if there was a scheduled retry pending - client beat us to it
- }
- else
- {
- // retry in 1 second
- RetryRequest(f, attemptNo + 1);
- }
- }
- else
- {
- // need to start ProducerController
- var producerController = CreateProducerController(f.ProducerId, f.ConsumerController);
- _producers = _producers.Add(f.ProducerId, (producerController, false));
- RetryRequest(f, attemptNo + 1);
- }
- }
-
- private void RetryRequest(FetchWarningEventsImpl f, int currentAttempt)
- {
- if (currentAttempt > MaxAttempts)
- {
- // request failed
- _logging.Error("Failed to process FetchWarningEvents for [{0}] afters [{1}] attempts", f.ProducerId, MaxAttempts);
- }
- else
- {
- Timers.StartSingleTimer(f.ProducerId, new RetryFetchWarnings(f, currentAttempt), TimeSpan.FromSeconds(1));
- }
- }
-
- private readonly record struct WarningEventFound(string ProductId, ProductInventoryWarningEvent Data);
-
- private sealed class Done
- {
- public static readonly Done Instance = new();
- private Done()
- {
- }
- }
-
- protected override void PreStart()
- {
- /*
- * Kicks off an Akka.Persistence.Query instance that will continuously
- */
- var query = Context.System.ReadJournalFor(SqlReadJournal.Identifier);
- query.EventsByTag(MessageTagger.WarningEventTag, Offset.Sequence(0))
- .Where(e => e.PersistenceId.StartsWith(ProductTotalsActor.TotalsEntityNameConstant))
- .Select(e =>
- {
- var splitPivot = e.PersistenceId.IndexOf("-", StringComparison.Ordinal);
- return new WarningEventFound(e.PersistenceId[(splitPivot + 1)..], (ProductInventoryWarningEvent)e.Event);
- })
- .To(Sink.ActorRef(Self, Done.Instance, ex => new Status.Failure(ex)))
- .Run(Context.Materializer());
- }
-
- private IActorRef CreateProducerController(string producerId, IActorRef requestor)
- {
- // 64KB chunks
- var producerControllerSettings = ProducerController.Settings.Create(Context.System) with { ChunkLargeMessagesBytes = 1024 };
- var producerControllerProps =
- ProducerController.Create(Context, producerId, Option.None, producerControllerSettings);
- Context.WatchWith(requestor, new ConsumerTerminated(producerId));
- var producerController = Context.ActorOf(producerControllerProps, $"warning-producer-controller-{producerId}");
-
- // register the consumer with the producer
- producerController.Tell(new ProducerController.RegisterConsumer(requestor));
-
- // start production
- producerController.Tell(new ProducerController.Start(Self));
- return producerController;
- }
-
- public ITimerScheduler Timers { get; set; } = null!;
-
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Dockerfile b/src/clustering/sharding-sqlserver/SqlSharding.Host/Dockerfile
deleted file mode 100644
index 0246b76c..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Dockerfile
+++ /dev/null
@@ -1,6 +0,0 @@
-FROM mcr.microsoft.com/dotnet/runtime:8.0
-WORKDIR /app
-
-COPY ./bin/Release/net8.0/publish/ /app
-
-CMD ["dotnet", "SqlSharding.Host.dll"]
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/Program.cs b/src/clustering/sharding-sqlserver/SqlSharding.Host/Program.cs
deleted file mode 100644
index 114d839d..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/Program.cs
+++ /dev/null
@@ -1,143 +0,0 @@
-// See https://aka.ms/new-console-template for more information
-
-using Akka.Cluster.Hosting;
-using Akka.Cluster.Sharding;
-using Akka.Hosting;
-using Akka.Persistence.Hosting;
-using Akka.Remote.Hosting;
-using Microsoft.Extensions.Configuration;
-using Microsoft.Extensions.Hosting;
-using Petabridge.Cmd.Cluster;
-using Petabridge.Cmd.Cluster.Sharding;
-using Petabridge.Cmd.Host;
-using SqlSharding.Host.Actors;
-using SqlSharding.Shared;
-using SqlSharding.Shared.Events;
-using SqlSharding.Shared.Serialization;
-using SqlSharding.Shared.Sharding;
-using Akka.Persistence.SqlServer.Hosting;
-
-var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "Development";
-var seedDb = Environment.GetEnvironmentVariable("SEED_DB")?.ToLowerInvariant() is "true" ||
- (args.Length > 0 && args[0] == "seed-db");
-
-var builder = new HostBuilder()
- .ConfigureAppConfiguration(c => c.AddEnvironmentVariables()
- .AddJsonFile("appsettings.json")
- .AddJsonFile($"appsettings.{environment}.json"))
- .ConfigureServices((context, services) =>
- {
- // maps to environment variable ConnectionStrings__AkkaSqlConnection
- var connectionString = context.Configuration.GetConnectionString("AkkaSqlConnection");
- if (connectionString is null)
- throw new Exception("AkkaSqlConnection setting is missing");
-
- var akkaSection = context.Configuration.GetSection("Akka");
-
- // maps to environment variable Akka__ClusterIp
- var hostName = akkaSection.GetValue("ClusterIp", "localhost");
-
- // maps to environment variable Akka__ClusterPort
- var port = akkaSection.GetValue("ClusterPort", 0);
-
- var seeds = akkaSection.GetValue("ClusterSeeds", new[] { "akka.tcp://SqlSharding@localhost:7918" })
- .ToArray();
-
- services.AddAkka("SqlSharding", (configurationBuilder, provider) =>
- {
- #region Custom sharding journal options setup
-
- var shardingJournalOptions = new SqlServerJournalOptions(
- isDefaultPlugin: false,
- identifier: "sharding")
- {
- ConnectionString = connectionString,
- TableName = "ShardingEventJournal",
- MetadataTableName = "ShardingMetadata",
- AutoInitialize = true
- };
- shardingJournalOptions.Adapters.AddWriteEventAdapter("tagger", new[] { typeof(object) });
-
- #endregion
-
- #region Custom sharding snapshot store options setup
-
- var shardingSnapshotOptions = new SqlServerSnapshotOptions(
- isDefaultPlugin: false,
- identifier: "sharding")
- {
- ConnectionString = connectionString,
- TableName = "ShardingSnapshotStore",
- AutoInitialize = true
- };
-
- #endregion
-
- configurationBuilder
- .WithRemoting(hostName, port)
- .AddAppSerialization()
- .WithClustering(new ClusterOptions
- {
- Roles = new[] { ProductActorProps.SingletonActorRole },
- SeedNodes = seeds
- })
- .WithSqlServerPersistence(
- connectionString: connectionString,
- journalBuilder: builder =>
- {
- builder.AddWriteEventAdapter("product-tagger", new[] { typeof(IProductEvent) });
- })
- .WithJournalAndSnapshot(shardingJournalOptions, shardingSnapshotOptions)
- .WithShardRegion(
- typeName: "products",
- entityPropsFactory: ProductTotalsActor.GetProps,
- messageExtractor: new ProductMessageRouter(),
- shardOptions: new ShardOptions
- {
- Role = ProductActorProps.SingletonActorRole,
- RememberEntities = true,
- StateStoreMode = StateStoreMode.DData,
- RememberEntitiesStore = RememberEntitiesStore.Eventsourced,
- JournalOptions = shardingJournalOptions,
- SnapshotOptions = shardingSnapshotOptions,
- FailOnInvalidEntityStateTransition = true
- })
- .WithClusterShardingJournalMigrationAdapter(shardingJournalOptions.PluginId)
- .WithSingleton(
- singletonName: "product-proxy",
- propsFactory: (_, _, resolver) => resolver.Props(),
- options: new ClusterSingletonOptions
- {
- Role = ProductActorProps.SingletonActorRole
- })
- .WithSingleton(
- singletonName: "sold-product-proxy",
- propsFactory: (_, _, resolver) => resolver.Props(),
- options: new ClusterSingletonOptions
- {
- Role = ProductActorProps.SingletonActorRole
- })
- .WithSingleton(
- singletonName: "warning-proxy",
- propsFactory: (_, _, resolver) => resolver.Props(),
- options: new ClusterSingletonOptions
- {
- Role = ProductActorProps.SingletonActorRole
- })
- .AddPetabridgeCmd(cmd =>
- {
- cmd.RegisterCommandPalette(ClusterShardingCommands.Instance);
- cmd.RegisterCommandPalette(ClusterCommands.Instance);
- })
- .AddStartup((system, registry) =>
- {
- if (!seedDb)
- return;
-
- new FakeDataGenerator().Generate(system, registry, 100);
- });
- });
- })
- .Build();
-
-await builder.RunAsync();
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/SqlSharding.Host.csproj b/src/clustering/sharding-sqlserver/SqlSharding.Host/SqlSharding.Host.csproj
deleted file mode 100644
index 04034348..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/SqlSharding.Host.csproj
+++ /dev/null
@@ -1,38 +0,0 @@
-
-
-
- Exe
- $(NetRuntime)
- enable
- enable
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- PreserveNewest
-
-
-
- PreserveNewest
-
-
-
- PreserveNewest
-
-
-
-
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Development.json b/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Development.json
deleted file mode 100644
index f847e8ec..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Development.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "Logging": {
- "LogLevel": {
- "Default": "Debug",
- "System": "Information",
- "Microsoft": "Information"
- }
- },
- "ConnectionStrings": {
- "AkkaSqlConnection": "Server=localhost,1533; Database=Akka; User Id=sa; Password=yourStrong(!)Password; TrustServerCertificate=true;"
- }
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Production.json b/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Production.json
deleted file mode 100644
index b4b1b915..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.Production.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "Logging": {
- "LogLevel": {
- "Default": "Debug",
- "System": "Information",
- "Microsoft": "Information"
- }
- }
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.json b/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.json
deleted file mode 100644
index b4b1b915..00000000
--- a/src/clustering/sharding-sqlserver/SqlSharding.Host/appsettings.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "Logging": {
- "LogLevel": {
- "Default": "Debug",
- "System": "Information",
- "Microsoft": "Information"
- }
- }
-}
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Dockerfile b/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Dockerfile
index 0246b76c..0c097a6a 100644
--- a/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Dockerfile
+++ b/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Dockerfile
@@ -3,4 +3,4 @@ WORKDIR /app
COPY ./bin/Release/net8.0/publish/ /app
-CMD ["dotnet", "SqlSharding.Host.dll"]
\ No newline at end of file
+CMD ["dotnet", "SqlSharding.Sql.Host.dll"]
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Program.cs b/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Program.cs
index 3346239b..a6cd451d 100644
--- a/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Program.cs
+++ b/src/clustering/sharding-sqlserver/SqlSharding.Sql.Host/Program.cs
@@ -42,7 +42,7 @@
var hostName = akkaSection.GetValue("ClusterIp", "localhost");
// maps to environment variable Akka__ClusterPort
- var port = akkaSection.GetValue("ClusterPort", 0);
+ var port = akkaSection.GetValue("ClusterPort", 7918);
var seeds = akkaSection.GetValue("ClusterSeeds", new[] { "akka.tcp://SqlSharding@localhost:7918" })
.ToArray();
@@ -64,7 +64,7 @@
DatabaseOptions = shardingJournalDbOptions,
TagStorageMode = TagMode.Csv,
DeleteCompatibilityMode = true,
- AutoInitialize = false
+ AutoInitialize = true
};
shardingJournalOptions.Adapters.AddWriteEventAdapter("tagger", new[] { typeof(object) });
@@ -82,7 +82,7 @@
ConnectionString = connectionString,
ProviderName = ProviderName.SqlServer2019,
DatabaseOptions = shardingSnapshotDbOptions,
- AutoInitialize = false
+ AutoInitialize = true
};
#endregion
@@ -101,8 +101,8 @@
databaseMapping: DatabaseMapping.SqlServer,
tagStorageMode: TagMode.Csv,
deleteCompatibilityMode: true,
- useWriterUuidColumn: false,
- autoInitialize: false,
+ useWriterUuidColumn: true,
+ autoInitialize: true,
journalBuilder: builder =>
{
builder.AddWriteEventAdapter("product-tagger", new[] { typeof(IProductEvent) });
diff --git a/src/clustering/sharding-sqlserver/SqlSharding.WebApp/Program.cs b/src/clustering/sharding-sqlserver/SqlSharding.WebApp/Program.cs
index 58f56f00..c75d961a 100644
--- a/src/clustering/sharding-sqlserver/SqlSharding.WebApp/Program.cs
+++ b/src/clustering/sharding-sqlserver/SqlSharding.WebApp/Program.cs
@@ -15,7 +15,7 @@
var hostName = akkaSection.GetValue("ClusterIp", "localhost");
// maps to environment variable Akka__ClusterPort
-var port = akkaSection.GetValue("ClusterPort", 7918);
+var port = akkaSection.GetValue("ClusterPort", 7919);
var seeds = akkaSection.GetValue("ClusterSeeds", new []{ "akka.tcp://SqlSharding@localhost:7918" })
.ToArray();
diff --git a/src/clustering/sharding-sqlserver/build.sh b/src/clustering/sharding-sqlserver/build.sh
old mode 100644
new mode 100755
index 29bab51b..3d8ddfc5
--- a/src/clustering/sharding-sqlserver/build.sh
+++ b/src/clustering/sharding-sqlserver/build.sh
@@ -4,7 +4,7 @@ parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )
cd "$parent_path"
version="0.1.0"
-hostImageName="sqlsharding.host"
+hostImageName="sqlsharding.sql.host"
webImageName="sqlsharding.web"
if [ -z $1 ]; then
@@ -16,10 +16,10 @@ else
echo "Building [${webImageName}] with tag [${version}]"
fi
-dotnet publish ./SqlSharding.Host/SqlSharding.Host.csproj -c Release -p:Version=${version}
+dotnet publish ./SqlSharding.Sql.Host/SqlSharding.Sql.Host.csproj -c Release -p:Version=${version}
-docker build ./SqlSharding.Host/. -t "${imageName}:${version}"
+docker build ./SqlSharding.Sql.Host/. -t "${hostImageName}:${version}"
dotnet publish ./SqlSharding.WebApp/SqlSharding.WebApp.csproj -c Release -p:Version=${version}
-docker build ./SqlSharding.WebApp/. -t "${imageName}:${version}"
\ No newline at end of file
+docker build ./SqlSharding.WebApp/. -t "${webImageName}:${version}"
\ No newline at end of file
diff --git a/src/clustering/sharding-sqlserver/start-dependencies.sh b/src/clustering/sharding-sqlserver/start-dependencies.sh
old mode 100644
new mode 100755
index 8bf0e973..bb01a533
--- a/src/clustering/sharding-sqlserver/start-dependencies.sh
+++ b/src/clustering/sharding-sqlserver/start-dependencies.sh
@@ -13,11 +13,15 @@ else
fi
if docker run -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=yourStrong(!)Password" -p 1533:1433 --name "sqlsharding-sql" -d "${imageName}:${version}" ; then
+ echo "started akkadotnet.sqlserver"
+
+else
echo "failed to start akkadotnet.sqlserver - building image first then retrying"
if ../../../infrastructure/build.all.sh $1 ; then
- echo "failed to build akkadotnet.sqlserver - aborting"
- return -1
docker run -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=yourStrong(!)Password" -p 1533:1433 --name "sqlsharding-sql" -d "${imageName}:${version}"
+ echo "started akkadotnet.sqlserver"
+ else
+ echo "failed to build akkadotnet.sqlserver - aborting"
fi
fi
\ No newline at end of file
diff --git a/src/cqrs/cqrs-sqlserver/CqrsSqlServer.DataModel/populate-db.sh b/src/cqrs/cqrs-sqlserver/CqrsSqlServer.DataModel/populate-db.sh
old mode 100644
new mode 100755
diff --git a/src/cqrs/cqrs-sqlserver/start-dependencies.sh b/src/cqrs/cqrs-sqlserver/start-dependencies.sh
old mode 100644
new mode 100755