Skip to content

Commit

Permalink
Add IShardingMessageAdapter (akkadotnet#7441)
Browse files Browse the repository at this point in the history
* Add `IShardingMessageAdapter`

* Use clearer name, update API approval list

* Fix API

* Refactor setup class to extension

* Refactor extension, merge the property to ClusterSharding extension

* Add unit test

* Update API approval list

* Fix comment wording
  • Loading branch information
Arkatufus authored Jan 6, 2025
1 parent 08dd5bd commit e425975
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 12 deletions.
10 changes: 2 additions & 8 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2023 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.5.27.1</VersionPrefix>
<VersionPrefix>1.5.34</VersionPrefix>
<PackageIcon>akkalogo.png</PackageIcon>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand Down Expand Up @@ -47,13 +47,7 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<PropertyGroup>
<PackageReleaseNotes>Akka.NET v1.5.27.1 is a minor patch to fix a race condition between the logging and remoting system.
[Akka: Fix Remoting-Logging DefaultAddress race condition](https://github.com/akkadotnet/akka.net/pull/7305)
To [see the full set of changes in Akka.NET v1.5.27.1, click here](https://github.com/akkadotnet/akka.net/milestone/110).
| COMMITS | LOC+ | LOC- | AUTHOR |
|---------|------|------|---------------------|
| 1 | 4 | 0 | Aaron Stannard |
| 1 | 10 | 3 | Gregorius Soedharmo |</PackageReleaseNotes>
<PackageReleaseNotes>Placeholder for nightlies*</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup Label="Analyzers" Condition="'$(MSBuildProjectName)' != 'Akka'">
<PackageReference Include="Akka.Analyzers" Version="$(AkkaAnalyzerVersion)" PrivateAssets="all" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// -----------------------------------------------------------------------
// <copyright file="ShardingBufferAdapterSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;

namespace Akka.Cluster.Sharding.Tests;

public class ShardingBufferAdapterSpec: AkkaSpec
{
private sealed class MessageExtractor: IMessageExtractor
{
public string EntityId(object message)
=> message switch
{
int i => i.ToString(),
_ => null
};

public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
int i => (i % 10).ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> (int.Parse(entityId) % 10).ToString();
}

private class EntityActor : ActorBase
{
protected override bool Receive(object message)
{
Sender.Tell(message);
return true;
}
}

private class TestMessageAdapter: IShardingBufferMessageAdapter
{
private readonly AtomicCounter _counter;

public TestMessageAdapter(AtomicCounter counter)
{
_counter = counter;
}

public object Apply(object message, IActorContext context)
{
_counter.IncrementAndGet();
return message;
}
}

private const string ShardTypeName = "Caat";

private static Config SpecConfig =>
ConfigurationFactory.ParseString(@"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.log-remote-lifecycle-events = off
akka.test.single-expect-default = 5 s
akka.cluster.sharding.state-store-mode = ""ddata""
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.cluster.sharding.distributed-data.durable.keys = []")
.WithFallback(ClusterSingletonManager.DefaultConfig()
.WithFallback(ClusterSharding.DefaultConfig()));

private readonly AtomicCounter _counterA = new (0);
private readonly AtomicCounter _counterB = new (0);

private readonly ActorSystem _sysA;
private readonly ActorSystem _sysB;

private readonly TestProbe _pA;
private readonly TestProbe _pB;

private readonly IActorRef _regionA;
private readonly IActorRef _regionB;

public ShardingBufferAdapterSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
{
_sysA = Sys;
_sysB = ActorSystem.Create(Sys.Name, Sys.Settings.Config);

InitializeLogger(_sysB, "[sysB]");

_pA = CreateTestProbe(_sysA);
_pB = CreateTestProbe(_sysB);

ClusterSharding.Get(_sysA).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterA));
ClusterSharding.Get(_sysB).SetShardingBufferMessageAdapter(new TestMessageAdapter(_counterB));

_regionA = StartShard(_sysA);
_regionB = StartShard(_sysB);
}

protected override void AfterAll()
{
if(_sysA != null)
Shutdown(_sysA);
if(_sysB != null)
Shutdown(_sysB);
base.AfterAll();
}

private IActorRef StartShard(ActorSystem sys)
{
return ClusterSharding.Get(sys).Start(
ShardTypeName,
Props.Create(() => new EntityActor()),
ClusterShardingSettings.Create(Sys).WithRememberEntities(true),
new MessageExtractor());
}

[Fact(DisplayName = "ClusterSharding buffer message adapter must be called when message was buffered")]
public void ClusterSharding_must_initialize_cluster_and_allocate_sharded_actors()
{
Cluster.Get(_sysA).Join(Cluster.Get(_sysA).SelfAddress); // coordinator on A

AwaitAssert(() =>
{
Cluster.Get(_sysA).SelfMember.Status.Should().Be(MemberStatus.Up);
}, TimeSpan.FromSeconds(1));

Cluster.Get(_sysB).Join(Cluster.Get(_sysA).SelfAddress);

Within(TimeSpan.FromSeconds(10), () =>
{
AwaitAssert(() =>
{
foreach (var s in ImmutableHashSet.Create(_sysA, _sysB))
{
Cluster.Get(s).SendCurrentClusterState(TestActor);
ExpectMsg<ClusterEvent.CurrentClusterState>().Members.Count.Should().Be(2);
}
});
});

_regionA.Tell(1, _pA.Ref);
_pA.ExpectMsg(1);

_regionB.Tell(2, _pB.Ref);
_pB.ExpectMsg(2);

_regionB.Tell(3, _pB.Ref);
_pB.ExpectMsg(3);

var counterAValue = _counterA.Current;
var counterBValue = _counterB.Current;

// Each newly instantiated entities should have their messages buffered at least once
// Buffer message adapter should be called everytime a message is buffered
counterAValue.Should().BeGreaterOrEqualTo(1);
counterBValue.Should().BeGreaterOrEqualTo(2);

_regionA.Tell(1, _pA.Ref);
_pA.ExpectMsg(1);

_regionB.Tell(2, _pB.Ref);
_pB.ExpectMsg(2);

_regionB.Tell(3, _pB.Ref);
_pB.ExpectMsg(3);

// Each entity should not have their messages buffered once they were instantiated
_counterA.Current.Should().Be(counterAValue);
_counterB.Current.Should().Be(counterBValue);
}
}
10 changes: 10 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Threading.Tasks;

using Akka.Actor;
using Akka.Annotations;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Dispatch;
Expand Down Expand Up @@ -366,6 +367,9 @@ public ClusterSharding(ExtendedActorSystem system)
/// </summary>
public ClusterShardingSettings Settings { get; }

[InternalApi]
public IShardingBufferMessageAdapter BufferMessageAdapter { get; private set; } = EmptyBufferMessageAdapter.Instance;

/// <summary>
/// Default HOCON settings for cluster sharding.
/// </summary>
Expand All @@ -376,6 +380,12 @@ public static Config DefaultConfig()
.WithFallback(DistributedData.DistributedData.DefaultConfig());
}

[InternalApi]
public void SetShardingBufferMessageAdapter(IShardingBufferMessageAdapter? bufferMessageAdapter)
{
BufferMessageAdapter = bufferMessageAdapter ?? EmptyBufferMessageAdapter.Instance;
}

/// <summary>
/// Register a named entity type by defining the <see cref="Actor.Props"/> of the entity actor
/// and functions to extract entity and shard identifier from messages. The <see cref="Sharding.ShardRegion"/> actor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// -----------------------------------------------------------------------
// <copyright file="IShardingMessageAdapter.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Annotations;

namespace Akka.Cluster.Sharding;

[InternalApi]
public interface IShardingBufferMessageAdapter
{
public object Apply(object message, IActorContext context);
}

[InternalApi]
internal class EmptyBufferMessageAdapter: IShardingBufferMessageAdapter
{
public static EmptyBufferMessageAdapter Instance { get; } = new ();

private EmptyBufferMessageAdapter()
{
}

public object Apply(object message, IActorContext context) => message;
}
8 changes: 6 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,8 @@ public override string ToString()
private readonly Lease? _lease;
private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used

private readonly IShardingBufferMessageAdapter _bufferMessageAdapter;

public ILoggingAdapter Log { get; } = Context.GetLogger();
public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!;
Expand Down Expand Up @@ -1017,6 +1019,8 @@ public Shard(

_leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval;
}

_bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter;
}

protected override SupervisorStrategy SupervisorStrategy()
Expand Down Expand Up @@ -1971,7 +1975,7 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd)
if (Log.IsDebugEnabled)
Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name,
id);
_messageBuffers.Append(id, msg, snd);
_messageBuffers.Append(id, _bufferMessageAdapter.Apply(msg, Context), snd);
}
}

Expand All @@ -1994,7 +1998,7 @@ private void SendMsgBuffer(EntityId entityId)
// and as the child exists, the message will be directly forwarded
foreach (var (message, @ref) in messages)
{
if (message is ShardRegion.StartEntity se)
if (WrappedMessage.Unwrap(message) is ShardRegion.StartEntity se)
StartEntity(se.EntityId, @ref);
else
DeliverMessage(entityId, message, @ref);
Expand Down
7 changes: 5 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ internal static Props ProxyProps(

private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System);
private readonly TaskCompletionSource<Done> _gracefulShutdownProgress = new();
private readonly IShardingBufferMessageAdapter _bufferMessageAdapter;

/// <summary>
/// TBD
Expand Down Expand Up @@ -464,6 +465,8 @@ public ShardRegion(
_initRegistrationDelay = TimeSpan.FromMilliseconds(100).Max(new TimeSpan(_retryInterval.Ticks / 2 / 2 / 2));
_nextRegistrationDelay = _initRegistrationDelay;

_bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter;

SetupCoordinatedShutdown();
}

Expand Down Expand Up @@ -812,7 +815,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender)
}
else
{
_shardBuffers.Append(shardId, message, sender);
_shardBuffers.Append(shardId, _bufferMessageAdapter.Apply(message, Context), sender);

// log some insight to how buffers are filled up every 10% of the buffer capacity
var total = totalBufferSize + 1;
Expand Down Expand Up @@ -1267,7 +1270,7 @@ private void DeliverBufferedMessages(ShardId shardId, IActorRef receiver)

foreach (var (Message, Ref) in buffer)
{
if (Message is RestartShard && !receiver.Equals(Self))
if (WrappedMessage.Unwrap(Message) is RestartShard && !receiver.Equals(Self))
{
_log.Debug(
"{0}: Dropping buffered message {1}, these are only processed by a local ShardRegion.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ namespace Akka.Cluster.Sharding
public class ClusterSharding : Akka.Actor.IExtension
{
public ClusterSharding(Akka.Actor.ExtendedActorSystem system) { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.Cluster.Sharding.IShardingBufferMessageAdapter BufferMessageAdapter { get; }
public Akka.Cluster.Sharding.ClusterShardingSettings Settings { get; }
public System.Collections.Immutable.ImmutableHashSet<string> ShardTypeNames { get; }
public static Akka.Configuration.Config DefaultConfig() { }
public Akka.Cluster.Sharding.IShardAllocationStrategy DefaultShardAllocationStrategy(Akka.Cluster.Sharding.ClusterShardingSettings settings) { }
public static Akka.Cluster.Sharding.ClusterSharding Get(Akka.Actor.ActorSystem system) { }
[Akka.Annotations.InternalApiAttribute()]
public void SetShardingBufferMessageAdapter(Akka.Cluster.Sharding.IShardingBufferMessageAdapter bufferMessageAdapter) { }
public Akka.Actor.IActorRef ShardRegion(string typeName) { }
public Akka.Actor.IActorRef ShardRegionProxy(string typeName) { }
[System.ObsoleteAttribute("Use one of the overloads that accepts an IMessageExtractor instead")]
Expand Down Expand Up @@ -215,6 +219,11 @@ namespace Akka.Cluster.Sharding
}
public interface IShardRegionCommand { }
public interface IShardRegionQuery { }
[Akka.Annotations.InternalApiAttribute()]
public interface IShardingBufferMessageAdapter
{
object Apply(object message, Akka.Actor.IActorContext context);
}
public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
void Start();
Expand Down
Loading

0 comments on commit e425975

Please sign in to comment.