From 02157c3aa69b1790549646f386f61e7b3f81c366 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 28 Oct 2024 11:08:41 -0500 Subject: [PATCH 1/5] fix potential `ArgumentException` during shard rebalancing (#7367) * fix potential `ArgumentException` during shard rebalancing close #7365 * moved back to original discard design --- .../Internal/AbstractLeastShardAllocationStrategy.cs | 8 +++++++- .../Akka.Cluster.Sharding/ShardAllocationStrategy.cs | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs index b10a94a8734..ea39eb4f685 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/AbstractLeastShardAllocationStrategy.cs @@ -126,7 +126,13 @@ protected bool IsAGoodTimeToRebalance(IEnumerable regionEntries) protected ImmutableList RegionEntriesFor(IImmutableDictionary> currentShardAllocations) { - var addressToMember = ClusterState.Members.ToImmutableDictionary(m => m.Address, m => m); + // switched to using `GroupBy` instead just ToImmutableDictionary due to https://github.com/akkadotnet/akka.net/issues/7365 + // it's very rare, but possible, that there can be two members with the same address in the ClusterState. This can happen + // when a node quickly reboots and re-uses its old address, but the old incarnation hasn't been downed yet. + var addressToMember = ClusterState.Members + .GroupBy(m => m.Address) + // using Last or First here is non-deterministic since the UID that appears in the UniqueAddress sort order is random + .ToImmutableDictionary(g => g.Key, g => g.First()); return currentShardAllocations.Select(i => { diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs index 6b1cc3a6acc..09cc8183f7d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs @@ -169,11 +169,11 @@ public override Task> Rebalance(IImmutableDictionary i, ShardSuitabilityOrdering.Instance).ToImmutableList(); if (IsAGoodTimeToRebalance(sortedRegionEntries)) { - var (_, Shards) = MostSuitableRegion(sortedRegionEntries); + var (_, shards) = MostSuitableRegion(sortedRegionEntries); // even if it is to another new node. var mostShards = sortedRegionEntries.Select(r => r.ShardIds.Where(s => !rebalanceInProgress.Contains(s))).MaxBy(i => i.Count())?.ToArray() ?? Array.Empty(); - var difference = mostShards.Length - Shards.Count; + var difference = mostShards.Length - shards.Count; if (difference >= _rebalanceThreshold) { var n = Math.Min( From 0cbddb50eefd0bd54c8b3a45d3c3fb18921832f9 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 30 Oct 2024 14:16:28 -0500 Subject: [PATCH 2/5] fixed bugs with multiple `Member`s with same `Address` crashing `ClusterDaemon` (#7371) close #7370 --- src/core/Akka.Cluster/ClusterDaemon.cs | 77 +++++++++++++------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index da7031ae17a..c86d0ef6c0e 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -1635,22 +1635,22 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) public void Leaving(Address address) { // only try to update if the node is available (in the member ring) - if (LatestGossip.Members.Any(m => m.Address.Equals(address) && m.Status is MemberStatus.Joining or MemberStatus.WeaklyUp or MemberStatus.Up)) + foreach(var mem in LatestGossip.Members.Where(m => m.Address.Equals(address))) { - // mark node as LEAVING - var newMembers = LatestGossip.Members.Select(m => + if (mem.Status is MemberStatus.Joining or MemberStatus.WeaklyUp or MemberStatus.Up) { - if (m.Address == address) return m.Copy(status: MemberStatus.Leaving); - return m; - }).ToImmutableSortedSet(); // mark node as LEAVING - var newGossip = LatestGossip.Copy(members: newMembers); - - UpdateLatestGossip(newGossip); + // mark node as LEAVING + var newMembers = LatestGossip.Members + .Remove(mem).Add(mem.Copy(status: MemberStatus.Leaving)); + var newGossip = LatestGossip.Copy(members: newMembers); + + UpdateLatestGossip(newGossip); - _cluster.LogInfo("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving); - PublishMembershipState(); - // immediate gossip to speed up the leaving process - SendGossip(); + _cluster.LogInfo("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving); + PublishMembershipState(); + // immediate gossip to speed up the leaving process + SendGossip(); + } } } @@ -1674,40 +1674,43 @@ public void Downing(Address address) var localGossip = LatestGossip; var localMembers = localGossip.Members; var localOverview = localGossip.Overview; - var localSeen = localOverview.Seen; var localReachability = _membershipState.DcReachability; // check if the node to DOWN is in the 'members' set - var member = localMembers.FirstOrDefault(m => m.Address == address); - if (member != null && member.Status != MemberStatus.Down) + var found = false; + foreach (var member in localMembers.Where(m => m.Address == address)) { - if (localReachability.IsReachable(member.UniqueAddress)) - _cluster.LogInfo("Marking node [{0}] as [{1}]", member.Address, MemberStatus.Down); - else - _cluster.LogInfo("Marking unreachable node [{0}] as [{1}]", member.Address, MemberStatus.Down); + found = true; + if (member.Status != MemberStatus.Down) + { + if (localReachability.IsReachable(member.UniqueAddress)) + _cluster.LogInfo("Marking node [{0}] as [{1}]", member.Address, MemberStatus.Down); + else + _cluster.LogInfo("Marking unreachable node [{0}] as [{1}]", member.Address, MemberStatus.Down); - var newGossip = localGossip.MarkAsDown(member); //update gossip - UpdateLatestGossip(newGossip); + var newGossip = localGossip.MarkAsDown(member); //update gossip + UpdateLatestGossip(newGossip); - PublishMembershipState(); + PublishMembershipState(); - if (address == _cluster.SelfAddress) - { - // spread the word quickly, without waiting for next gossip tick - SendGossipRandom(MaxGossipsBeforeShuttingDownMyself); - } - else - { - // try to gossip immediately to downed node, as a STONITH signal - GossipTo(member.UniqueAddress); + if (address == _cluster.SelfAddress) + { + // spread the word quickly, without waiting for next gossip tick + SendGossipRandom(MaxGossipsBeforeShuttingDownMyself); + } + else + { + // try to gossip immediately to downed node, as a STONITH signal + GossipTo(member.UniqueAddress); + } } + + // if the previous statement did not evaluate to true, then this node is already being downed + } - else if (member != null) - { - // already down - } - else + + if (!found) { _cluster.LogInfo("Ignoring down of unknown node [{0}]", address); } From 67fad904eff92f12cf79453c4f06583393d2506b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 1 Nov 2024 19:45:34 +0700 Subject: [PATCH 3/5] Fix EventSourced Stash filtering out identical envelopes (#7375) --- src/core/Akka/Actor/Stash/Internal/AbstractStash.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs index 7bc74d5dd14..0e925deefd6 100644 --- a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs +++ b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs @@ -180,7 +180,7 @@ public void Prepend(IEnumerable envelopes) { // since we want to save the order of messages, but still prepending using AddFirst, // we must enumerate envelopes in reversed order - foreach (var envelope in envelopes.Distinct().Reverse()) + foreach (var envelope in envelopes.Reverse()) { _theStash.AddFirst(envelope); } From 37008091602b341e7830a87c3c2388271046c45e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 1 Nov 2024 08:08:49 -0500 Subject: [PATCH 4/5] Added reproduction for missing `Stash.Stash` messages in Akka.,Persistence actors (#7374) reproduces #7373 --- .../Akka.Persistence.Tests/Bugfix7373Specs.cs | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/core/Akka.Persistence.Tests/Bugfix7373Specs.cs diff --git a/src/core/Akka.Persistence.Tests/Bugfix7373Specs.cs b/src/core/Akka.Persistence.Tests/Bugfix7373Specs.cs new file mode 100644 index 00000000000..6817db97bb4 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/Bugfix7373Specs.cs @@ -0,0 +1,81 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Threading.Tasks; +using Akka.Actor; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Tests; + +public class Bugfix7373Specs : AkkaSpec +{ + public Bugfix7373Specs(ITestOutputHelper output) : base(output) + { + } + + /// + /// Reproduction for https://github.com/akkadotnet/akka.net/issues/7373 + /// + [Fact] + public async Task ShouldDeliverAllStashedMessages() + { + // arrange + var actor = Sys.ActorOf(Props.Create()); + + // act + var msg = new Msg(1); + actor.Tell(msg); + actor.Tell(msg); + + actor.Tell("Initialize"); + + // assert + await ExpectMsgAsync($"Processed: {msg}"); + await ExpectMsgAsync($"Processed: {msg}"); + } + + public sealed record Msg(int Id); + + public class MinimalStashingActor : UntypedPersistentActor, IWithStash + { + public override string PersistenceId => "minimal-stashing-actor"; + + protected override void OnCommand(object message) + { + Sender.Tell($"Processed: {message}"); + } + + private void Ready(object message) + { + switch (message) + { + case "Initialize": + Persist("init", e => + { + Stash.UnstashAll(); // Unstash all stashed messages + Become(OnCommand); // Transition to ready state + }); + break; + default: + Stash.Stash(); // Stash messages until initialized + break; + } + } + + protected override void OnRecover(object message) + { + switch (message) + { + case RecoveryCompleted: + Become(Ready); + break; + } + } + } +} \ No newline at end of file From 699aff2e4c0655054989dc1c3b87bf7822ba12d5 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 2 Nov 2024 00:26:12 +0700 Subject: [PATCH 5/5] [Streams] Fix ShellRegistered message deadletter log (#7376) --- .../Implementation/Fusing/ActorGraphInterpreter.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs index c97c51cb558..5b6c72aaf55 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/ActorGraphInterpreter.cs @@ -767,7 +767,9 @@ public AsyncInput(GraphInterpreterShell shell, GraphStageLogic logic, object @ev public GraphInterpreterShell Shell { get; } } - private class ShellRegistered + // This is the Resume internal API message in JVM, it is used to prevent/short circuit recursive calls + // inside a stream. Harmless when dead-lettered. + private class ShellRegistered: IDeadLetterSuppression { public static readonly ShellRegistered Instance = new(); private ShellRegistered()