Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IShardingMessageAdapter #7441

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// -----------------------------------------------------------------------
// <copyright file="IShardingMessageAdapter.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Annotations;

namespace Akka.Cluster.Sharding;

[InternalApi]
public interface IShardingBufferedMessageAdapter
{
public object Adapt(object message);
}

[InternalApi]
internal class EmptyBufferedMessageAdapter: IShardingBufferedMessageAdapter
{
public static EmptyBufferedMessageAdapter Instance { get; } = new ();

private EmptyBufferedMessageAdapter()
{
}

public object Adapt(object message) => message;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be used to transform a message before it is stored inside the buffer.

}
9 changes: 7 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,8 @@ public override string ToString()
private readonly Lease? _lease;
private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used

private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter;

public ILoggingAdapter Log { get; } = Context.GetLogger();
public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!;
Expand Down Expand Up @@ -1017,6 +1019,9 @@ public Shard(

_leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval;
}

var setup = Context.System.Settings.Setup.Get<ShardingSetup>();
_bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance;
}

protected override SupervisorStrategy SupervisorStrategy()
Expand Down Expand Up @@ -1971,7 +1976,7 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd)
if (Log.IsDebugEnabled)
Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name,
id);
_messageBuffers.Append(id, msg, snd);
_messageBuffers.Append(id, _bufferedMessageAdapter.Adapt(msg), snd);
}
}

Expand All @@ -1994,7 +1999,7 @@ private void SendMsgBuffer(EntityId entityId)
// and as the child exists, the message will be directly forwarded
foreach (var (message, @ref) in messages)
{
if (message is ShardRegion.StartEntity se)
if (WrappedMessage.Unwrap(message) is ShardRegion.StartEntity se)
StartEntity(se.EntityId, @ref);
else
DeliverMessage(entityId, message, @ref);
Expand Down
8 changes: 6 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ internal static Props ProxyProps(

private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System);
private readonly TaskCompletionSource<Done> _gracefulShutdownProgress = new();
private readonly IShardingBufferedMessageAdapter _bufferedMessageAdapter;

/// <summary>
/// TBD
Expand Down Expand Up @@ -464,6 +465,9 @@ public ShardRegion(
_initRegistrationDelay = TimeSpan.FromMilliseconds(100).Max(new TimeSpan(_retryInterval.Ticks / 2 / 2 / 2));
_nextRegistrationDelay = _initRegistrationDelay;

var setup = Context.System.Settings.Setup.Get<ShardingSetup>();
_bufferedMessageAdapter = setup.HasValue ? setup.Value.BufferedMessageAdapter : EmptyBufferedMessageAdapter.Instance;

SetupCoordinatedShutdown();
}

Expand Down Expand Up @@ -812,7 +816,7 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender)
}
else
{
_shardBuffers.Append(shardId, message, sender);
_shardBuffers.Append(shardId, _bufferedMessageAdapter.Adapt(message), sender);

// log some insight to how buffers are filled up every 10% of the buffer capacity
var total = totalBufferSize + 1;
Expand Down Expand Up @@ -1267,7 +1271,7 @@ private void DeliverBufferedMessages(ShardId shardId, IActorRef receiver)

foreach (var (Message, Ref) in buffer)
{
if (Message is RestartShard && !receiver.Equals(Self))
if (WrappedMessage.Unwrap(Message) is RestartShard && !receiver.Equals(Self))
{
_log.Debug(
"{0}: Dropping buffered message {1}, these are only processed by a local ShardRegion.",
Expand Down
26 changes: 26 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardingSetup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// -----------------------------------------------------------------------
// <copyright file="ShardingSetup.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor.Setup;
using Akka.Annotations;

#nullable enable
namespace Akka.Cluster.Sharding;

[InternalApi]
public class ShardingSetup: Setup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setup class will be used to inject a message adapter into both the ShardRegion and Shard actor during initialization

{
public static ShardingSetup Create(IShardingBufferedMessageAdapter bufferedMessageAdapter)
=> new (bufferedMessageAdapter);

internal ShardingSetup(IShardingBufferedMessageAdapter bufferedMessageAdapter)
{
BufferedMessageAdapter = bufferedMessageAdapter;
}

public IShardingBufferedMessageAdapter BufferedMessageAdapter { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ namespace Akka.Cluster.Sharding
}
public interface IShardRegionCommand { }
public interface IShardRegionQuery { }
[Akka.Annotations.InternalApiAttribute()]
public interface IShardingBufferedMessageAdapter
{
object Adapt(object message);
}
public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
void Start();
Expand Down Expand Up @@ -335,6 +340,13 @@ namespace Akka.Cluster.Sharding
public override int GetHashCode() { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
[System.Runtime.CompilerServices.NullableAttribute(0)]
public class ShardingSetup : Akka.Actor.Setup.Setup
{
public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; }
public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { }
}
public enum StateStoreMode
{
Persistence = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ namespace Akka.Cluster.Sharding
}
public interface IShardRegionCommand { }
public interface IShardRegionQuery { }
[Akka.Annotations.InternalApiAttribute()]
public interface IShardingBufferedMessageAdapter
{
object Adapt(object message);
}
public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
void Start();
Expand Down Expand Up @@ -335,6 +340,13 @@ namespace Akka.Cluster.Sharding
public override int GetHashCode() { }
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
[System.Runtime.CompilerServices.NullableAttribute(0)]
public class ShardingSetup : Akka.Actor.Setup.Setup
{
public Akka.Cluster.Sharding.IShardingBufferedMessageAdapter BufferedMessageAdapter { get; }
public static Akka.Cluster.Sharding.ShardingSetup Create(Akka.Cluster.Sharding.IShardingBufferedMessageAdapter bufferedMessageAdapter) { }
}
public enum StateStoreMode
{
Persistence = 0,
Expand Down
Loading