Skip to content

Commit

Permalink
Add missing singleton detection
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Oct 23, 2024
1 parent de98c10 commit 710cba7
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
Expand All @@ -14,11 +15,18 @@
using Akka.Event;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tools.Tests.Singleton
{
public class ClusterSingletonProxySpec : TestKit.Xunit2.TestKit
{
public ClusterSingletonProxySpec(ITestOutputHelper output): base(output: output)
{
}

[Fact]
public void ClusterSingletonProxy_must_correctly_identify_the_singleton()
{
Expand Down Expand Up @@ -67,25 +75,190 @@ await AwaitConditionAsync(
}
}

[Fact(DisplayName = "ClusterSingletonProxy should detect if its associated singleton failed to start after a period")]
public async Task ClusterSingletonProxySingletonTimeoutTest()
{
ActorSys seed = null;
ActorSys testSystem = null;

try
{
seed = new ActorSys(output: Output);
seed.Cluster.Join(seed.Cluster.SelfAddress);

// singleton proxy is waiting for a singleton in a non-existent role
testSystem = new ActorSys(
config: """
akka.cluster.singleton-proxy {
role = "non-existent"
log-singleton-identification-failure = true
singleton-identification-failure-period = 500ms
}
""",
output: Output);

testSystem.Sys.EventStream.Subscribe<ClusterSingletonProxy.IdentifySingletonTimedOut>(testSystem
.TestActor);

// Proxy should not try to detect missing singleton if it is not part of a cluster
await testSystem.ExpectNoMsgAsync(1.Seconds());

testSystem.Cluster.Join(seed.Cluster.SelfAddress);

// proxy will emit IdentifySingletonTimedOut event locally if it could not find its associated singleton
// within the detection period
var msg = await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
msg.SingletonName.Should().Be("singleton");
msg.Role.Should().Be("non-existent");
msg.Duration.Should().Be(TimeSpan.FromMilliseconds(500));

// force seed to leave
seed.Cluster.Leave(seed.Cluster.SelfAddress);

// another event should be fired because the cluster topology changed
msg = await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
msg.SingletonName.Should().Be("singleton");
msg.Role.Should().Be("non-existent");
msg.Duration.Should().Be(TimeSpan.FromMilliseconds(500));
}
finally
{
var tasks = new List<Task>();

if(seed is not null)
tasks.Add(seed.Sys.Terminate());
if(testSystem is not null)
tasks.Add(testSystem.Sys.Terminate());

if(tasks.Any())
await Task.WhenAll(tasks);
}
}

[Fact(DisplayName = "ClusterSingletonProxy should not start singleton identify detection if a singleton reference already found")]
public async Task ClusterSingletonProxySingletonTimeoutTest2()
{
const string seedConfig = """
akka.cluster {
roles = [seed] # only start singletons on seed role
min-nr-of-members = 1
singleton.role = seed # only start singletons on seed role
singleton-proxy.role = seed # only start singletons on seed role
}
""";

ActorSys seed = null;
ActorSys seed2 = null;
ActorSys testSystem = null;

try
{
seed = new ActorSys(config: seedConfig, output: Output);
seed.Cluster.Join(seed.Cluster.SelfAddress);

// need to make sure that cluster member age is correct. seed node should be oldest.
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(seed.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 1),
TimeSpan.FromSeconds(30));

seed2 = new ActorSys(config: seedConfig, output: Output);
seed2.Cluster.Join(seed.Cluster.SelfAddress);

// singleton proxy is waiting for a singleton in seed role
testSystem = new ActorSys(
config: """
akka.cluster {
roles = [proxy]
singleton.role = seed # only start singletons on seed role
singleton-proxy {
role = seed # only start singletons on seed role
log-singleton-identification-failure = true
singleton-identification-failure-period = 1s
}
}
""",
startSingleton: false,
output: Output);

testSystem.Sys.EventStream.Subscribe<ClusterSingletonProxy.IdentifySingletonTimedOut>(testSystem.TestActor);
testSystem.Cluster.Join(seed.Cluster.SelfAddress);

// need to make sure that cluster member age is correct. seed node should be oldest.
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 3),
TimeSpan.FromSeconds(30));

testSystem.TestProxy("hello");

// timeout event should not fire
await testSystem.ExpectNoMsgAsync(1.5.Seconds());

// Second seed node left the cluster, no timeout should be fired because singleton is homed in the first seed
await seed2.Sys.Terminate();

// wait until MemberRemoved is triggered
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 2),
TimeSpan.FromSeconds(30));

// timeout event should not fire
await testSystem.ExpectNoMsgAsync(1.5.Seconds());

// First seed node which homed the singleton left the cluster
await seed.Sys.Terminate();

// wait until MemberRemoved is triggered
await AwaitConditionAsync(
() => Task.FromResult(Cluster.Get(testSystem.Sys).State.Members.Count(m => m.Status == MemberStatus.Up) == 1),
TimeSpan.FromSeconds(30));

// Proxy will emit IdentifySingletonTimedOut event locally because it lost the singleton reference
// and no nodes are eligible to home the singleton
await testSystem.ExpectMsgAsync<ClusterSingletonProxy.IdentifySingletonTimedOut>(3.Seconds());
}
finally
{
var tasks = new List<Task>();

if(seed is not null)
tasks.Add(seed.Sys.Terminate());
if(seed2 is not null)
tasks.Add(seed2.Sys.Terminate());
if(testSystem is not null)
tasks.Add(testSystem.Sys.Terminate());

if(tasks.Any())
await Task.WhenAll(tasks);
}
}

private class ActorSys : TestKit.Xunit2.TestKit
{
public Cluster Cluster { get; }

public ActorSys(string name = "ClusterSingletonProxySystem", Address joinTo = null, int bufferSize = 1000)
: base(ActorSystem.Create(name, ConfigurationFactory.ParseString(_cfg).WithFallback(TestKit.Configs.TestConfigs.DefaultConfig)))
public ActorSys(string name = "ClusterSingletonProxySystem", Address joinTo = null, int bufferSize = 1000, string config = null, bool startSingleton = true, ITestOutputHelper output = null)
: base(ActorSystem.Create(
name: name,
config: config is null
? ConfigurationFactory.ParseString(_cfg).WithFallback(DefaultConfig)
: ConfigurationFactory.ParseString(config).WithFallback(_cfg).WithFallback(DefaultConfig)),
output: output)
{
Cluster = Cluster.Get(Sys);
if (joinTo != null)
{
Cluster.Join(joinTo);
}

Cluster.RegisterOnMemberUp(() =>
if (startSingleton)
{
Sys.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys)
.WithRemovalMargin(TimeSpan.FromSeconds(5))), "singletonmanager");
});
Cluster.RegisterOnMemberUp(() =>
{
Sys.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new Singleton()), PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys)
.WithRemovalMargin(TimeSpan.FromSeconds(5))), "singletonmanager");
});
}

Proxy =
Sys.ActorOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeede
private TryToIdentifySingleton() { }
}

/// <summary>
/// Used by the proxy to signal that no singleton has been found after a period of time
/// </summary>
internal sealed class IdentifySingletonTimeOutTick : INoSerializationVerificationNeeded
{
/// <summary>
/// TBD
/// </summary>
public static IdentifySingletonTimeOutTick Instance { get; } = new();
private IdentifySingletonTimeOutTick() { }
}

/// <summary>
/// Used by the proxy to signal that no singleton has been found after a period of time
/// </summary>
public sealed class IdentifySingletonTimedOut : INoSerializationVerificationNeeded
{
public IdentifySingletonTimedOut(string singletonName, string role, TimeSpan duration)
{
SingletonName = singletonName;
Role = role;
Duration = duration;
}

public string SingletonName { get; }
public string Role { get; }
public TimeSpan Duration { get; }
}

/// <summary>
/// Returns default HOCON configuration for the cluster singleton.
/// </summary>
Expand Down Expand Up @@ -85,6 +114,7 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
private string _identityId;
private IActorRef _singleton = null;
private ICancelable _identityTimer = null;
private ICancelable _identityTimeoutTimer = null;
private ImmutableSortedSet<Member> _membersByAge;
private ILoggingAdapter _log;

Expand All @@ -110,7 +140,12 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
if (m.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress))
Context.Stop(Self);
else
{
Remove(m.Member);
// start or reset identify timeout every time a member is removed (excluding self)
TrackIdentifyTimeout();
}
});
Receive<ClusterEvent.IMemberEvent>(_ =>
{
Expand Down Expand Up @@ -139,6 +174,22 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId));
}
});
Receive<IdentifySingletonTimeOutTick>(_ =>
{
// We somehow missed a CancelTimer() and a singleton reference was found when we waited,
// ignoring the timeout tick message.
if (_singleton is not null)
return;
Log.Warning(
"ClusterSingletonProxy failed to find an associated singleton named [{0}] in role [{1}] after {2} seconds.",
_settings.SingletonName, _settings.Role, _settings.SingletonIdentificationFailurePeriod.TotalSeconds);
Context.System.EventStream.Publish(new IdentifySingletonTimedOut(
singletonName: _settings.SingletonName,
role: _settings.Role,
duration: _settings.SingletonIdentificationFailurePeriod));
});
Receive<Terminated>(terminated =>
{
if (Equals(_singleton, terminated.ActorRef))
Expand Down Expand Up @@ -191,6 +242,12 @@ private void CancelTimer()
_identityTimer.Cancel();
_identityTimer = null;
}

if (_identityTimeoutTimer is not null)
{
_identityTimeoutTimer.Cancel();
_identityTimeoutTimer = null;
}
}

private bool MatchingRole(Member member)
Expand Down Expand Up @@ -222,6 +279,29 @@ private void IdentifySingleton()
receiver: Self,
message: TryToIdentifySingleton.Instance,
sender: Self);

// reset identify timeout every time we try to identify a new singleton
TrackIdentifyTimeout();
}

private void TrackIdentifyTimeout()
{
if (_identityTimeoutTimer is not null)
{
_identityTimeoutTimer.Cancel();
_identityTimeoutTimer = null;
}

// Don't start the timer if we already have a singleton reference
if (_singleton is not null)
return;

if(_settings.LogSingletonIdentificationFailure)
_identityTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(
delay: _settings.SingletonIdentificationFailurePeriod,
receiver: Self,
message: IdentifySingletonTimeOutTick.Instance,
sender: Self);
}

private void TrackChanges(Action block)
Expand All @@ -242,6 +322,9 @@ private void Add(Member member)
_membersByAge = _membersByAge.Remove(member); //replace
_membersByAge = _membersByAge.Add(member);
});

// start or reset identify timeout every time a new member joined (including self)
TrackIdentifyTimeout();
}

private void Remove(Member member)
Expand Down
Loading

0 comments on commit 710cba7

Please sign in to comment.