Skip to content

Commit

Permalink
Merge branch 'dev' into akkadotnet#7358-Fix-async-circuit-breaker
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Nov 1, 2024
2 parents 34c0732 + 699aff2 commit ad27e81
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ protected bool IsAGoodTimeToRebalance(IEnumerable<RegionEntry> regionEntries)

protected ImmutableList<RegionEntry> RegionEntriesFor(IImmutableDictionary<IActorRef, IImmutableList<string>> currentShardAllocations)
{
var addressToMember = ClusterState.Members.ToImmutableDictionary(m => m.Address, m => m);
// switched to using `GroupBy` instead just ToImmutableDictionary due to https://github.com/akkadotnet/akka.net/issues/7365
// it's very rare, but possible, that there can be two members with the same address in the ClusterState. This can happen
// when a node quickly reboots and re-uses its old address, but the old incarnation hasn't been downed yet.
var addressToMember = ClusterState.Members
.GroupBy(m => m.Address)
// using Last or First here is non-deterministic since the UID that appears in the UniqueAddress sort order is random
.ToImmutableDictionary(g => g.Key, g => g.First());

return currentShardAllocations.Select(i =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ public override Task<IImmutableSet<ShardId>> Rebalance(IImmutableDictionary<IAct
var sortedRegionEntries = RegionEntriesFor(currentShardAllocations).OrderBy(i => i, ShardSuitabilityOrdering.Instance).ToImmutableList();
if (IsAGoodTimeToRebalance(sortedRegionEntries))
{
var (_, Shards) = MostSuitableRegion(sortedRegionEntries);
var (_, shards) = MostSuitableRegion(sortedRegionEntries);
// even if it is to another new node.
var mostShards = sortedRegionEntries.Select(r => r.ShardIds.Where(s => !rebalanceInProgress.Contains(s))).MaxBy(i => i.Count())?.ToArray() ?? Array.Empty<string>();

var difference = mostShards.Length - Shards.Count;
var difference = mostShards.Length - shards.Count;
if (difference >= _rebalanceThreshold)
{
var n = Math.Min(
Expand Down
77 changes: 40 additions & 37 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1635,22 +1635,22 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip)
public void Leaving(Address address)
{
// only try to update if the node is available (in the member ring)
if (LatestGossip.Members.Any(m => m.Address.Equals(address) && m.Status is MemberStatus.Joining or MemberStatus.WeaklyUp or MemberStatus.Up))
foreach(var mem in LatestGossip.Members.Where(m => m.Address.Equals(address)))
{
// mark node as LEAVING
var newMembers = LatestGossip.Members.Select(m =>
if (mem.Status is MemberStatus.Joining or MemberStatus.WeaklyUp or MemberStatus.Up)
{
if (m.Address == address) return m.Copy(status: MemberStatus.Leaving);
return m;
}).ToImmutableSortedSet(); // mark node as LEAVING
var newGossip = LatestGossip.Copy(members: newMembers);

UpdateLatestGossip(newGossip);
// mark node as LEAVING
var newMembers = LatestGossip.Members
.Remove(mem).Add(mem.Copy(status: MemberStatus.Leaving));
var newGossip = LatestGossip.Copy(members: newMembers);
UpdateLatestGossip(newGossip);

_cluster.LogInfo("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving);
PublishMembershipState();
// immediate gossip to speed up the leaving process
SendGossip();
_cluster.LogInfo("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving);
PublishMembershipState();
// immediate gossip to speed up the leaving process
SendGossip();
}
}
}

Expand All @@ -1674,40 +1674,43 @@ public void Downing(Address address)
var localGossip = LatestGossip;
var localMembers = localGossip.Members;
var localOverview = localGossip.Overview;
var localSeen = localOverview.Seen;
var localReachability = _membershipState.DcReachability;

// check if the node to DOWN is in the 'members' set
var member = localMembers.FirstOrDefault(m => m.Address == address);
if (member != null && member.Status != MemberStatus.Down)
var found = false;
foreach (var member in localMembers.Where(m => m.Address == address))
{
if (localReachability.IsReachable(member.UniqueAddress))
_cluster.LogInfo("Marking node [{0}] as [{1}]", member.Address, MemberStatus.Down);
else
_cluster.LogInfo("Marking unreachable node [{0}] as [{1}]", member.Address, MemberStatus.Down);
found = true;
if (member.Status != MemberStatus.Down)
{
if (localReachability.IsReachable(member.UniqueAddress))
_cluster.LogInfo("Marking node [{0}] as [{1}]", member.Address, MemberStatus.Down);
else
_cluster.LogInfo("Marking unreachable node [{0}] as [{1}]", member.Address, MemberStatus.Down);


var newGossip = localGossip.MarkAsDown(member); //update gossip
UpdateLatestGossip(newGossip);
var newGossip = localGossip.MarkAsDown(member); //update gossip
UpdateLatestGossip(newGossip);

PublishMembershipState();
PublishMembershipState();

if (address == _cluster.SelfAddress)
{
// spread the word quickly, without waiting for next gossip tick
SendGossipRandom(MaxGossipsBeforeShuttingDownMyself);
}
else
{
// try to gossip immediately to downed node, as a STONITH signal
GossipTo(member.UniqueAddress);
if (address == _cluster.SelfAddress)
{
// spread the word quickly, without waiting for next gossip tick
SendGossipRandom(MaxGossipsBeforeShuttingDownMyself);
}
else
{
// try to gossip immediately to downed node, as a STONITH signal
GossipTo(member.UniqueAddress);
}
}

// if the previous statement did not evaluate to true, then this node is already being downed

}
else if (member != null)
{
// already down
}
else

if (!found)
{
_cluster.LogInfo("Ignoring down of unknown node [{0}]", address);
}
Expand Down
81 changes: 81 additions & 0 deletions src/core/Akka.Persistence.Tests/Bugfix7373Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// -----------------------------------------------------------------------
// <copyright file="Bugfix7373Specs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Tests;

public class Bugfix7373Specs : AkkaSpec
{
public Bugfix7373Specs(ITestOutputHelper output) : base(output)
{
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/7373
/// </summary>
[Fact]
public async Task ShouldDeliverAllStashedMessages()
{
// arrange
var actor = Sys.ActorOf(Props.Create<MinimalStashingActor>());

// act
var msg = new Msg(1);
actor.Tell(msg);
actor.Tell(msg);

actor.Tell("Initialize");

// assert
await ExpectMsgAsync($"Processed: {msg}");
await ExpectMsgAsync($"Processed: {msg}");
}

public sealed record Msg(int Id);

public class MinimalStashingActor : UntypedPersistentActor, IWithStash
{
public override string PersistenceId => "minimal-stashing-actor";

protected override void OnCommand(object message)
{
Sender.Tell($"Processed: {message}");
}

private void Ready(object message)
{
switch (message)
{
case "Initialize":
Persist("init", e =>
{
Stash.UnstashAll(); // Unstash all stashed messages
Become(OnCommand); // Transition to ready state
});
break;
default:
Stash.Stash(); // Stash messages until initialized
break;
}
}

protected override void OnRecover(object message)
{
switch (message)
{
case RecoveryCompleted:
Become(Ready);
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,9 @@ public AsyncInput(GraphInterpreterShell shell, GraphStageLogic logic, object @ev
public GraphInterpreterShell Shell { get; }
}

private class ShellRegistered
// This is the Resume internal API message in JVM, it is used to prevent/short circuit recursive calls
// inside a stream. Harmless when dead-lettered.
private class ShellRegistered: IDeadLetterSuppression
{
public static readonly ShellRegistered Instance = new();
private ShellRegistered()
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/Stash/Internal/AbstractStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void Prepend(IEnumerable<Envelope> envelopes)
{
// since we want to save the order of messages, but still prepending using AddFirst,
// we must enumerate envelopes in reversed order
foreach (var envelope in envelopes.Distinct().Reverse())
foreach (var envelope in envelopes.Reverse())
{
_theStash.AddFirst(envelope);
}
Expand Down

0 comments on commit ad27e81

Please sign in to comment.