-
Notifications
You must be signed in to change notification settings - Fork 264
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
18 changed files
with
620 additions
and
0 deletions.
There are no files selected for viewing
8 changes: 8 additions & 0 deletions
8
src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/ActorKeys.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
namespace ClusterClientSample.FrontEnd.Actors; | ||
|
||
// This class will only be used as a Type marker to retrieve the `ClusterClient` actor from the `ActorRegistry`. | ||
// It is not meant to be instantiated in any way. | ||
public sealed class GatewayClusterClientActor | ||
{ | ||
private GatewayClusterClientActor(){ } | ||
} |
48 changes: 48 additions & 0 deletions
48
src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/BatchedWorkRequester.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.Client; | ||
using Akka.Event; | ||
using Akka.Hosting; | ||
using ClusterClientSample.Shared; | ||
|
||
namespace ClusterClientSample.FrontEnd.Actors; | ||
|
||
/// <summary> | ||
/// This actor will: | ||
/// * periodically send a `BatchedWork` request message to the backend app "/user/worker-manager" | ||
/// service via the `ClusterClient` actor | ||
/// * receive the `Result` message for each work performed and logs them | ||
/// </summary> | ||
public class BatchedWorkRequester: ReceiveActor, IWithTimers | ||
{ | ||
private const string BatchKey = nameof(BatchKey); | ||
|
||
private readonly Random _random = new(); | ||
|
||
public BatchedWorkRequester(IRequiredActor<GatewayClusterClientActor> clusterClientActor) | ||
{ | ||
var log = Context.GetLogger(); | ||
var clusterClient = clusterClientActor.ActorRef; | ||
|
||
Receive<BatchedWork>(msg => | ||
{ | ||
log.Info("Requesting a batched work to the other cluster. Count: {0}", msg.Size); | ||
clusterClient.Tell(new ClusterClient.Send("/user/worker-manager", msg, true)); | ||
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(10)); | ||
}); | ||
|
||
Receive<Result>(msg => | ||
{ | ||
log.Info("[ID:{0}] Work result: {1}", msg.Id, msg.Value); | ||
}); | ||
} | ||
|
||
protected override void PreStart() | ||
{ | ||
base.PreStart(); | ||
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(3)); | ||
} | ||
|
||
public ITimerScheduler Timers { get; set; } = null!; | ||
|
||
private BatchedWork GetBatch() => new (_random.Next(5) + 5); | ||
} |
51 changes: 51 additions & 0 deletions
51
src/clustering/cluster-client/ClusterClientSample.FrontEnd/Actors/WorkReportCollector.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.Client; | ||
using Akka.Event; | ||
using Akka.Hosting; | ||
using ClusterClientSample.Shared; | ||
|
||
namespace ClusterClientSample.FrontEnd.Actors; | ||
|
||
/// <summary> | ||
/// This actor will: | ||
/// * periodically publish a `SendReport` message to the backend app "report" pub-sub topic via the `ClusterClient` actor | ||
/// * receive the `Report` message from the workload metric actor and logs them | ||
/// </summary> | ||
public class WorkReportCollector: ReceiveActor, IWithTimers | ||
{ | ||
private class GetReport | ||
{ | ||
public static readonly GetReport Instance = new(); | ||
private GetReport() { } | ||
} | ||
|
||
private const string ReportKey = nameof(ReportKey); | ||
|
||
public WorkReportCollector(IRequiredActor<GatewayClusterClientActor> clusterClientActor) | ||
{ | ||
var log = Context.GetLogger(); | ||
var clusterClient = clusterClientActor.ActorRef; | ||
|
||
Receive<Report>(report => | ||
{ | ||
foreach (var (actor, count) in report.Counts) | ||
{ | ||
log.Info("Worker {0} has done {1} works", actor, count); | ||
} | ||
}); | ||
|
||
Receive<GetReport>(_ => | ||
{ | ||
log.Info("Requesting work report metrics from the other cluster."); | ||
clusterClient.Tell(new ClusterClient.Publish("report", SendReport.Instance)); | ||
}); | ||
} | ||
|
||
public ITimerScheduler Timers { get; set; } = null!; | ||
|
||
protected override void PreStart() | ||
{ | ||
base.PreStart(); | ||
Timers.StartPeriodicTimer(ReportKey, GetReport.Instance, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10)); | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
...lustering/cluster-client/ClusterClientSample.FrontEnd/ClusterClientSample.FrontEnd.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Akka.Cluster.Hosting" /> | ||
<PackageReference Include="Akka.Serialization.Hyperion" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
43 changes: 43 additions & 0 deletions
43
src/clustering/cluster-client/ClusterClientSample.FrontEnd/Program.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
using Akka.Cluster.Hosting; | ||
using Akka.Hosting; | ||
using Akka.Remote.Hosting; | ||
using ClusterClientSample.FrontEnd.Actors; | ||
using Microsoft.Extensions.Hosting; | ||
|
||
const string gatewaySystemAddress = "akka.tcp://cluster-system@localhost:12552"; | ||
const string systemName = "remote-cluster-system"; | ||
const string hostName = "localhost"; | ||
const int port = 12553; | ||
|
||
Console.Title = "Frontend Gateway Node"; | ||
|
||
var host = new HostBuilder() | ||
.ConfigureServices((context, services) => | ||
{ | ||
services.AddAkka(systemName, builder => | ||
{ | ||
builder | ||
// Setup remoting | ||
.WithRemoting(configure: options => | ||
{ | ||
options.Port = port; | ||
options.HostName = hostName; | ||
}) | ||
// Setup `ClusterClient` actor to connect to another actor system | ||
.WithClusterClient<GatewayClusterClientActor>([ $"{gatewaySystemAddress}/system/receptionist" ]) | ||
// Setup required actors and startup code | ||
.WithActors((system, registry, resolver) => | ||
{ | ||
var requesterActor = system.ActorOf(resolver.Props(typeof(BatchedWorkRequester)), "work-batch-requester"); | ||
registry.Register<BatchedWorkRequester>(requesterActor); | ||
var reportCollectorActor = system.ActorOf(resolver.Props(typeof(WorkReportCollector)), "work-report-collector"); | ||
registry.Register<WorkReportCollector>(reportCollectorActor); | ||
}); | ||
}); | ||
}).Build(); | ||
|
||
await host.StartAsync(); | ||
await host.WaitForShutdownAsync(); |
23 changes: 23 additions & 0 deletions
23
src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/MetricCounterActor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
using Akka.Actor; | ||
using ClusterClientSample.Shared; | ||
|
||
namespace ClusterClientSample.Gateway.Actors; | ||
|
||
public class MetricCounterActor : ReceiveActor | ||
{ | ||
public MetricCounterActor() | ||
{ | ||
var counts = new Dictionary<IActorRef, int>(); | ||
|
||
Receive<WorkComplete>(_ => | ||
{ | ||
if (counts.TryGetValue(Sender, out var count)) | ||
counts[Sender] = ++count; | ||
else | ||
counts.Add(Sender, 1); | ||
}); | ||
|
||
Receive<SendReport>(_ => Sender.Tell(new Report( | ||
counts.ToDictionary(kvp => kvp.Key.Path.ToString(), kvp => kvp.Value)))); | ||
} | ||
} |
45 changes: 45 additions & 0 deletions
45
src/clustering/cluster-client/ClusterClientSample.Gateway/Actors/WorkerManagerActor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
using Akka.Actor; | ||
using Akka.Cluster.Routing; | ||
using Akka.Event; | ||
using Akka.Hosting; | ||
using Akka.Routing; | ||
using ClusterClientSample.Shared; | ||
|
||
namespace ClusterClientSample.Gateway.Actors; | ||
|
||
public class WorkerManagerActor : ReceiveActor | ||
{ | ||
private static int _nextId = 1; | ||
|
||
public WorkerManagerActor(IRequiredActor<MetricCounterActor> counterActor) | ||
{ | ||
var log = Context.GetLogger(); | ||
var counter = counterActor.ActorRef; | ||
var workerRouter = GetWorkerRouter(counter); | ||
|
||
Receive<BatchedWork>(batch => | ||
{ | ||
log.Info("Generating a work batch of size {0}", batch.Size); | ||
for (var i = 0; i < batch.Size; i++) | ||
{ | ||
// forward the work request as if it was sent by the original sender so that the work result can be | ||
// sent back to the original sender by the worker | ||
workerRouter.Forward(new Work(_nextId++)); | ||
} | ||
}); | ||
} | ||
|
||
private static IActorRef GetWorkerRouter(IActorRef counter) | ||
{ | ||
// Creates a cluster router pool of 10 workers for each cluster node with the role "worker" | ||
// that joins the cluster. | ||
// | ||
// The router will use a round-robin strategy to distribute messages amongst the worker actors | ||
var props = new ClusterRouterPool( | ||
local: new RoundRobinPool(10), | ||
settings: new ClusterRouterPoolSettings(30, 10, true, "worker")) | ||
.Props(Props.Create(() => new WorkerActor(counter))); | ||
|
||
return Context.ActorOf(props); | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
src/clustering/cluster-client/ClusterClientSample.Gateway/ClusterClientSample.Gateway.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Akka.Cluster.Hosting" /> | ||
<PackageReference Include="Akka.Serialization.Hyperion" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
78 changes: 78 additions & 0 deletions
78
src/clustering/cluster-client/ClusterClientSample.Gateway/Program.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
using Akka.Actor; | ||
using Akka.Cluster.Hosting; | ||
using Akka.Cluster.Tools.Client; | ||
using Akka.Hosting; | ||
using Akka.Remote.Hosting; | ||
using ClusterClientSample.Gateway.Actors; | ||
using Microsoft.Extensions.Hosting; | ||
|
||
const string systemName = "cluster-system"; | ||
const string hostName = "localhost"; | ||
const string port = "12552"; | ||
const string selfAddress = $"akka.tcp://{systemName}@{hostName}:{port}"; | ||
|
||
// This node acts as a known contact point for all external actor systems to connect via `ClusterClient` by setting up | ||
// the `ClusterClientReceptionist` | ||
Console.Title = "Backend Gateway Node"; | ||
|
||
var host = new HostBuilder() | ||
.ConfigureServices((context, services) => | ||
{ | ||
services.AddAkka(systemName, builder => | ||
{ | ||
builder | ||
// Setup remoting and clustering | ||
.WithRemoting(configure: options => | ||
{ | ||
options.Port = int.Parse(port); | ||
options.HostName = hostName; | ||
}) | ||
.WithClustering(options: new ClusterOptions | ||
{ | ||
// Note that we do not assign the role/tag "worker" to this node, no worker actors will be | ||
// deployed in this node. | ||
// | ||
// If we want the gateway to also work double duty as worker node, we can add the "worker" role/tag | ||
// to the Roles array. | ||
Roles = [ "gateway" ], | ||
SeedNodes = [ selfAddress ] | ||
}) | ||
// Setup `ClusterClientReceptionist` to only deploy on nodes with "gateway" role | ||
.WithClusterClientReceptionist(role: "gateway") | ||
// Setup required actors and startup code | ||
.WithActors((system, registry, resolver) => | ||
{ | ||
// The name of this actor ("worker-manager") is required, because its absolute path | ||
// ("/user/worker-manager") will be used as a service path by ClusterClientReceptionist. | ||
// | ||
// This name has to be unique for all actor names living in this actor system. | ||
var workerManagerActor = system.ActorOf(resolver.Props(typeof(WorkerManagerActor)), "worker-manager"); | ||
registry.Register<WorkerManagerActor>(workerManagerActor); | ||
// The name of this actor ("metric-workload-counter") is optional as it leverages the | ||
// distributed pub-sub system. External actor systems does not need to know the actor path to | ||
// query the workload metric. | ||
var workLoadCounterActor = system.ActorOf(Props.Create(() => new MetricCounterActor()), "metric-workload-counter"); | ||
registry.Register<MetricCounterActor>(workLoadCounterActor); | ||
}) | ||
.AddStartup((system, registry) => | ||
{ | ||
var receptionist = ClusterClientReceptionist.Get(system); | ||
// Register the worker manager actor as a service, | ||
// this can be accessed through "user/worker-manager" | ||
var workerManagerActor = registry.Get<WorkerManagerActor>(); | ||
receptionist.RegisterService(workerManagerActor); | ||
// Register the workload counter metric actor as a topic listener, it will subscribe to the | ||
// "report" topic | ||
var workloadCounterActor = registry.Get<MetricCounterActor>(); | ||
receptionist.RegisterSubscriber("report", workloadCounterActor); | ||
}); | ||
}); | ||
}).Build(); | ||
|
||
await host.StartAsync(); | ||
await host.WaitForShutdownAsync(); |
19 changes: 19 additions & 0 deletions
19
src/clustering/cluster-client/ClusterClientSample.Node/ClusterClientSample.Node.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Akka.Cluster.Hosting" /> | ||
<PackageReference Include="Akka.Serialization.Hyperion" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Oops, something went wrong.