Skip to content

Commit

Permalink
Add ClusterClient sample project (#185)
Browse files Browse the repository at this point in the history
* Add ClusterClient sample project

* Fix central package version management error

---------

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
Arkatufus and Aaronontheweb authored May 20, 2024
1 parent b1b4b23 commit 293642f
Show file tree
Hide file tree
Showing 18 changed files with 620 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ClusterClientSample.FrontEnd.Actors;

// This class will only be used as a Type marker to retrieve the `ClusterClient` actor from the `ActorRegistry`.
// It is not meant to be instantiated in any way.
public sealed class GatewayClusterClientActor
{
private GatewayClusterClientActor(){ }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using Akka.Hosting;
using ClusterClientSample.Shared;

namespace ClusterClientSample.FrontEnd.Actors;

/// <summary>
/// This actor will:
/// * periodically send a `BatchedWork` request message to the backend app "/user/worker-manager"
/// service via the `ClusterClient` actor
/// * receive the `Result` message for each work performed and logs them
/// </summary>
public class BatchedWorkRequester: ReceiveActor, IWithTimers
{
private const string BatchKey = nameof(BatchKey);

private readonly Random _random = new();

public BatchedWorkRequester(IRequiredActor<GatewayClusterClientActor> clusterClientActor)
{
var log = Context.GetLogger();
var clusterClient = clusterClientActor.ActorRef;

Receive<BatchedWork>(msg =>
{
log.Info("Requesting a batched work to the other cluster. Count: {0}", msg.Size);
clusterClient.Tell(new ClusterClient.Send("/user/worker-manager", msg, true));
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(10));
});

Receive<Result>(msg =>
{
log.Info("[ID:{0}] Work result: {1}", msg.Id, msg.Value);
});
}

protected override void PreStart()
{
base.PreStart();
Timers.StartSingleTimer(BatchKey, GetBatch(), TimeSpan.FromSeconds(3));
}

public ITimerScheduler Timers { get; set; } = null!;

private BatchedWork GetBatch() => new (_random.Next(5) + 5);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using Akka.Hosting;
using ClusterClientSample.Shared;

namespace ClusterClientSample.FrontEnd.Actors;

/// <summary>
/// This actor will:
/// * periodically publish a `SendReport` message to the backend app "report" pub-sub topic via the `ClusterClient` actor
/// * receive the `Report` message from the workload metric actor and logs them
/// </summary>
public class WorkReportCollector: ReceiveActor, IWithTimers
{
private class GetReport
{
public static readonly GetReport Instance = new();
private GetReport() { }
}

private const string ReportKey = nameof(ReportKey);

public WorkReportCollector(IRequiredActor<GatewayClusterClientActor> clusterClientActor)
{
var log = Context.GetLogger();
var clusterClient = clusterClientActor.ActorRef;

Receive<Report>(report =>
{
foreach (var (actor, count) in report.Counts)
{
log.Info("Worker {0} has done {1} works", actor, count);
}
});

Receive<GetReport>(_ =>
{
log.Info("Requesting work report metrics from the other cluster.");
clusterClient.Tell(new ClusterClient.Publish("report", SendReport.Instance));
});
}

public ITimerScheduler Timers { get; set; } = null!;

protected override void PreStart()
{
base.PreStart();
Timers.StartPeriodicTimer(ReportKey, GetReport.Instance, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Akka.Cluster.Hosting;
using Akka.Hosting;
using Akka.Remote.Hosting;
using ClusterClientSample.FrontEnd.Actors;
using Microsoft.Extensions.Hosting;

const string gatewaySystemAddress = "akka.tcp://cluster-system@localhost:12552";
const string systemName = "remote-cluster-system";
const string hostName = "localhost";
const int port = 12553;

Console.Title = "Frontend Gateway Node";

var host = new HostBuilder()
.ConfigureServices((context, services) =>
{
services.AddAkka(systemName, builder =>
{
builder
// Setup remoting
.WithRemoting(configure: options =>
{
options.Port = port;
options.HostName = hostName;
})
// Setup `ClusterClient` actor to connect to another actor system
.WithClusterClient<GatewayClusterClientActor>([ $"{gatewaySystemAddress}/system/receptionist" ])
// Setup required actors and startup code
.WithActors((system, registry, resolver) =>
{
var requesterActor = system.ActorOf(resolver.Props(typeof(BatchedWorkRequester)), "work-batch-requester");
registry.Register<BatchedWorkRequester>(requesterActor);
var reportCollectorActor = system.ActorOf(resolver.Props(typeof(WorkReportCollector)), "work-report-collector");
registry.Register<WorkReportCollector>(reportCollectorActor);
});
});
}).Build();

await host.StartAsync();
await host.WaitForShutdownAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Akka.Actor;
using ClusterClientSample.Shared;

namespace ClusterClientSample.Gateway.Actors;

public class MetricCounterActor : ReceiveActor
{
public MetricCounterActor()
{
var counts = new Dictionary<IActorRef, int>();

Receive<WorkComplete>(_ =>
{
if (counts.TryGetValue(Sender, out var count))
counts[Sender] = ++count;
else
counts.Add(Sender, 1);
});

Receive<SendReport>(_ => Sender.Tell(new Report(
counts.ToDictionary(kvp => kvp.Key.Path.ToString(), kvp => kvp.Value))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Event;
using Akka.Hosting;
using Akka.Routing;
using ClusterClientSample.Shared;

namespace ClusterClientSample.Gateway.Actors;

public class WorkerManagerActor : ReceiveActor
{
private static int _nextId = 1;

public WorkerManagerActor(IRequiredActor<MetricCounterActor> counterActor)
{
var log = Context.GetLogger();
var counter = counterActor.ActorRef;
var workerRouter = GetWorkerRouter(counter);

Receive<BatchedWork>(batch =>
{
log.Info("Generating a work batch of size {0}", batch.Size);
for (var i = 0; i < batch.Size; i++)
{
// forward the work request as if it was sent by the original sender so that the work result can be
// sent back to the original sender by the worker
workerRouter.Forward(new Work(_nextId++));
}
});
}

private static IActorRef GetWorkerRouter(IActorRef counter)
{
// Creates a cluster router pool of 10 workers for each cluster node with the role "worker"
// that joins the cluster.
//
// The router will use a round-robin strategy to distribute messages amongst the worker actors
var props = new ClusterRouterPool(
local: new RoundRobinPool(10),
settings: new ClusterRouterPoolSettings(30, 10, true, "worker"))
.Props(Props.Create(() => new WorkerActor(counter)));

return Context.ActorOf(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Cluster.Tools.Client;
using Akka.Hosting;
using Akka.Remote.Hosting;
using ClusterClientSample.Gateway.Actors;
using Microsoft.Extensions.Hosting;

const string systemName = "cluster-system";
const string hostName = "localhost";
const string port = "12552";
const string selfAddress = $"akka.tcp://{systemName}@{hostName}:{port}";

// This node acts as a known contact point for all external actor systems to connect via `ClusterClient` by setting up
// the `ClusterClientReceptionist`
Console.Title = "Backend Gateway Node";

var host = new HostBuilder()
.ConfigureServices((context, services) =>
{
services.AddAkka(systemName, builder =>
{
builder
// Setup remoting and clustering
.WithRemoting(configure: options =>
{
options.Port = int.Parse(port);
options.HostName = hostName;
})
.WithClustering(options: new ClusterOptions
{
// Note that we do not assign the role/tag "worker" to this node, no worker actors will be
// deployed in this node.
//
// If we want the gateway to also work double duty as worker node, we can add the "worker" role/tag
// to the Roles array.
Roles = [ "gateway" ],
SeedNodes = [ selfAddress ]
})
// Setup `ClusterClientReceptionist` to only deploy on nodes with "gateway" role
.WithClusterClientReceptionist(role: "gateway")
// Setup required actors and startup code
.WithActors((system, registry, resolver) =>
{
// The name of this actor ("worker-manager") is required, because its absolute path
// ("/user/worker-manager") will be used as a service path by ClusterClientReceptionist.
//
// This name has to be unique for all actor names living in this actor system.
var workerManagerActor = system.ActorOf(resolver.Props(typeof(WorkerManagerActor)), "worker-manager");
registry.Register<WorkerManagerActor>(workerManagerActor);
// The name of this actor ("metric-workload-counter") is optional as it leverages the
// distributed pub-sub system. External actor systems does not need to know the actor path to
// query the workload metric.
var workLoadCounterActor = system.ActorOf(Props.Create(() => new MetricCounterActor()), "metric-workload-counter");
registry.Register<MetricCounterActor>(workLoadCounterActor);
})
.AddStartup((system, registry) =>
{
var receptionist = ClusterClientReceptionist.Get(system);
// Register the worker manager actor as a service,
// this can be accessed through "user/worker-manager"
var workerManagerActor = registry.Get<WorkerManagerActor>();
receptionist.RegisterService(workerManagerActor);
// Register the workload counter metric actor as a topic listener, it will subscribe to the
// "report" topic
var workloadCounterActor = registry.Get<MetricCounterActor>();
receptionist.RegisterSubscriber("report", workloadCounterActor);
});
});
}).Build();

await host.StartAsync();
await host.WaitForShutdownAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\ClusterClientSample.Shared\ClusterClientSample.Shared.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
<PackageReference Include="Akka.Serialization.Hyperion" />
</ItemGroup>

</Project>
Loading

0 comments on commit 293642f

Please sign in to comment.