Skip to content

Commit

Permalink
Merge branch 'dev' into Fix_ClusterSingletonProxy_failed_to_reacquire
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Aug 7, 2024
2 parents 67c88d5 + da3ded3 commit 4feeda3
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 33 deletions.
166 changes: 157 additions & 9 deletions docs/articles/clustering/cluster-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,159 @@ It is possible to make the cluster client stop entirely if it cannot find a rece
### Contact Auto-Discovery Using Akka.Discovery

> [!NOTE]
> This feature is currently considered as an advanced feature and is not currently compatible with Akka.Discovery Akka.Hosting extensions.
> This feature can only be used with:
>
> This feature should not be used until Akka.Management 1.5.26 is released.
> * Akka.Management v1.5.27 or later.
> * Akka.Cluster.Hosting v1.5.27 or later
> * Akka.Cluster.Tools v1.5.27 or later
This feature is added in Akka.NET 1.5.26. Instead of watching for actor termination manually, you can leverage [Akka.Discovery](../discovery/index.md) to discover cluster client contact points inside a dynamic environment such as [Kubernetes](https://github.com/akkadotnet/Akka.Management/blob/dev/docs/articles/discovery/kubernetes.md), [AWS](https://github.com/akkadotnet/Akka.Management/blob/dev/docs/articles/discovery/aws.md), or anywhere else with [Azure Table](https://github.com/akkadotnet/Akka.Management/blob/dev/src/discovery/azure/Akka.Discovery.Azure/README.md)
This feature is added in Akka.NET 1.5.27. Instead of watching for actor termination manually, you can leverage [Akka.Discovery](../discovery/index.md) to discover cluster client contact points inside a dynamic environment such as [Kubernetes](https://github.com/akkadotnet/Akka.Management/blob/dev/docs/articles/discovery/kubernetes.md), [AWS](https://github.com/akkadotnet/Akka.Management/blob/dev/docs/articles/discovery/aws.md), or anywhere else with [Azure Table](https://github.com/akkadotnet/Akka.Management/blob/dev/src/discovery/azure/Akka.Discovery.Azure/README.md)

### Contact Auto-Discovery Setup Using Akka.Hosting

Cluster client discovery API has been added in `Akka.Cluster.Hosting` v1.5.27. You can use the `.WithClusterClientDiscovery()` extension method to use the cluster client initial contact auto discovery feature.

#### Example: Setting Up Contact Auto-Discovery With Akka.Discovery.KubernetesApi

On your cluster client node side, these are the code you'll need to implement:

```csharp
services.AddAkka("ClusterClientSys", (builder, provider) => {
builder
// This code sets up ClusterClient that works using Akka.Discovery
.WithClusterClientDiscovery<MyClusterClientActorKey>(options => {
// This is the Discovery plugin that will be used with ClusterClientDiscovery.
options.DiscoveryOptions = new KubernetesDiscoveryOptions {
// IMPORTANT:
// This signals Akka.Hosting that this plugin **should not** be used for ClusterBootstrap
IsDefaultPlugin = false,

// IMPORTANT:
// The ConfigPath property has to be different than the default discovery ConfigPath.
// The actual name does not matter, but it has to be different than the default name "kubernetes-api"
ConfigPath = "kubernetes-api-cluster-client",

// IMPORTANT:
// The PodLabelSelector property has to be different than the default k8s discovery
// PodLabelSelector, which defaults to "app={0}". The "{0}" is important because
// it will be used inside a String.Format()
PodLabelSelector = "discovery={0}";
};

// This has to match the Kubernetes metadata label that we'll set in YAML
options.ServiceName = "initial-contact";

// his has to match the Kubernetes port name for the Akka.Management port
options.PortName = "management";
});
```

On the YAML side, you will need to change the Receptionist YAML and add a new metadata label to tag the pods:

```yaml
spec:
template:
metadata:
labels:
# Note that "discovery" matches the `PodLabelSelector` in `.WithClusterClientDiscovery()`
# Note that "initial-contact" matches the `ServiceName` in `.WithClusterClientDiscovery()`
discovery: initial-contact
```

If you're not using ClusterBootstrap on the Receptionist side, you have to start Akka.Management. Skip this step if you're using ClusterBootstrap:

```csharp
services.AddAkka("ReceptionistSys", (builder, provider) => {
builder.AddStartup(async (system, registry) => {
await AkkaManagement.Get(system).Start();
});
});
```

#### Example: Setting Up Contact Auto-Discovery With Akka.Discovery.Azure

On your cluster client node side, these are the code you'll need to implement:

```csharp
services.AddAkka("ClusterClientSys", (builder, provider) => {
builder
// This code sets up ClusterClient that works using Akka.Discovery
.WithClusterClientDiscovery<MyClusterClientActorKey>(options => {
// This is the Discovery plugin that will be used with ClusterClientDiscovery.
options.DiscoveryOptions = new AkkaDiscoveryOptions {
// IMPORTANT:
// This signals Akka.Hosting that this plugin **should not** be used for ClusterBootstrap
IsDefaultPlugin = false,

// IMPORTANT:
// The ConfigPath property has to be different than the default discovery ConfigPath.
// The actual name does not matter, but it has to be different than the default name "azure"
ConfigPath = "azure-cluster-client",

// IMPORTANT:
// This discovery plugin **should not** participate in updating the Azure cluster member table
ReadOnly = true,

// IMPORTANT:
// All service names for cluster client discovery should be the same.
// If you're also using ClusterBootstrap, make sure that this name does not collide.
ServiceName = "cluster-client",

// IMPORTANT:
// All table names for cluster client discovery should be the same.
// If you're also using ClusterBootstrap, make sure that this table name does not collide.
TableName = "akkaclusterreceptionists",
};

// This has to match the name we set inside the discovery options
options.ServiceName = "cluster-client";
})

// If you're not using ClusterBootstrap in the cluster client side, you will need to add
// these code
.AddStartup(async (system, registry) => {
await AkkaManagement.Get(system).Start();
});
```

On the cluster client receptionist side, you will need to implement these code:

```csharp
services.AddAkka("ReceptionistSys", (builder, provider) => {

builder
// This is the Discovery plugin that will be used with ClusterClientDiscovery.
.WithAzureDiscovery(options => {
// IMPORTANT:
// This signals Akka.Hosting that this plugin **should not** be used for ClusterBootstrap
options.IsDefaultPlugin = false,

// IMPORTANT:
// The ConfigPath property has to be different than the default discovery ConfigPath.
// The actual name does not matter, but it has to be different than the default name "azure"
options.ConfigPath = "azure-cluster-client",

// IMPORTANT:
// All service names for cluster client discovery should be the same.
// If you're also using ClusterBootstrap, make sure that this name does not collide.
options.ServiceName = "cluster-client",

// IMPORTANT:
// All table names for cluster client discovery should be the same.
// If you're also using ClusterBootstrap, make sure that this table name does not collide.
options.TableName = "akkaclusterreceptionists",
}

// If you're not using ClusterBootstrap in the cluster client side, you will need to add
// these code
.AddStartup(async (system, registry) => {
await AkkaManagement.Get(system).Start();
});
});

```

### Contact Auto-Discovery Setup Using Hocon Configuration

The HOCON configuration to set these are:

Expand Down Expand Up @@ -176,7 +324,7 @@ To enable contact auto-discovery, you will need to:

### Using Akka.Discovery For Both Akka.Cluster.Tools.Client And Akka.Management.Cluster.Bootstrap

If you need to use Akka.Discovery with both ClusterClient AND ClusterBootstrap, you will have to **make sure** that you have **TWO** different Akka.Discovery settings living side-by-side under the `akka.discovery` HOCON setting object.
If you need to use Akka.Discovery with both ClusterClient AND ClusterBootstrap, you will have to **make sure** that you have **TWO** different Akka.Discovery settings living side-by-side under the `akka.discovery` HOCON setting section.

#### Akka.Discovery.KubernetesApi Example

Expand All @@ -191,17 +339,17 @@ In your YAML file:
contact: cluster-client
```

* Make sure you name the Akka remoting port
* Make sure you name the Akka.Management port

```yaml
spec:
template:
spec:
containers:
ports:
- containerPort: 2552 # This is the remoting port, change this to match yours
- containerPort: 8558 # This is the remoting port, change this to match yours
protocol: TCP
name: akka-remote # this is important
name: management # this is important
```

In your cluster client Akka.NET node HOCON settings:
Expand All @@ -210,7 +358,7 @@ In your cluster client Akka.NET node HOCON settings:
* Rename the HOCON section to `akka.discovery.kubernetes-api-cluster-client`. The key name does not matter, what matters is that the name does not collide with any other setting section name under `akka.discovery`.
* Make sure you change `akka.discovery.kubernetes-api-cluster-client.pod-label-selector` to "contact={0}" to match what we have in the YAML file.
* Make sure you change `akka.cluster.client.discovery.service-name` to "cluster-client" to match what we have in the YAML file.
* Make sure you change `akka.cluster.client.discovery.port-name` value to "akka-remote" to match what we have in the YAML file.
* Make sure you change `akka.cluster.client.discovery.port-name` value to "management" to match what we have in the YAML file.
* Keep the `akka.discovery.method` HOCON value to "kubernetes-api", this is the discovery extension that will be used by ClusterBootstrap.
* Change the `akka.cluster.client.discovery.method` value from "\<method>" to "kubernetes-api-cluster-client", this is the discovery extension that will be used by ClusterClient. If not set, this will default to the value set in `akka.discovery.method`, which is **NOT** what we want.

Expand All @@ -220,7 +368,7 @@ In your cluster receptionist Akka.NET node HOCON settings:

* Copy the `akka.discovery.azure` HOCON section and paste it above or under the original settings. You can also copy the value from [here](https://github.com/akkadotnet/Akka.Management/blob/dev/src/discovery/azure/Akka.Discovery.Azure/reference.conf)
* Rename the HOCON section to `akka.discovery.azure-cluster-client`. The key name does not matter, what matters is that the name does not collide with any other setting section name under `akka.discovery`.
* Change `akka.discovery.azure-cluster-client.public-port` to the remoting port of the Akka.NET node.
* Change `akka.discovery.azure-cluster-client.public-port` to the management port of the Akka.NET node.
* Change `akka.discovery.azure-cluster-client.service-name` to "cluster-client". The name does not matter, what matters is that this name **HAS** to match the service name we'll be using in `akka.cluster.client.discovery.service-name`.
* **[OPTIONAL]** change `akka.discovery.azure-cluster-client.table-name` to `akkaclusterreceptionists` to separate the discovery table from ClusterBootstrap entries.
* Make sure that you start the discovery extension in the receptionist side. This needs to be done because the extension is responsible for updating the Azure table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ShardingProducerController(string producerId, IActorRef shardRegion, Opti
ShardRegion = shardRegion;
_durableQueueProps = durableQueueProps;
Settings = settings;
_timeProvider = timeProvider ?? DateTimeOffsetNowTimeProvider.Instance;
_timeProvider = timeProvider ?? Context.System.Scheduler;

WaitingForStart(Option<IActorRef>.None, CreateInitialState(_durableQueueProps.HasValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ namespace Akka.Actor
public CoordinatedShutdownExtension() { }
public override Akka.Actor.CoordinatedShutdown CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
[System.ObsoleteAttribute("This class will be removed in Akka.NET v1.6.0 - use the IScheduler instead.")]
public class DateTimeOffsetNowTimeProvider : Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider
{
public System.TimeSpan HighResMonotonicClock { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ namespace Akka.Actor
public CoordinatedShutdownExtension() { }
public override Akka.Actor.CoordinatedShutdown CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
[System.ObsoleteAttribute("This class will be removed in Akka.NET v1.6.0 - use the IScheduler instead.")]
public class DateTimeOffsetNowTimeProvider : Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider
{
public System.TimeSpan HighResMonotonicClock { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public EventSourcedProducerQueue(string persistenceId, EventSourcedProducerQueue
{
PersistenceId = persistenceId;
Settings = settings ?? EventSourcedProducerQueue.Settings.Create(Context.System);
_timeProvider = timeProvider ?? DateTimeOffsetNowTimeProvider.Instance;
_timeProvider = timeProvider ?? Context.System.Scheduler;
JournalPluginId = Settings.JournalPluginId;
SnapshotPluginId = Settings.SnapshotPluginId;
Self.Tell(EventSourcedProducerQueue.CleanupTick.Instance);
Expand Down
24 changes: 7 additions & 17 deletions src/core/Akka/Actor/Scheduler/DateTimeNowTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,19 @@
namespace Akka.Actor
{
/// <summary>
/// TBD
/// The default <see cref="ITimeProvider"/> implementation for Akka.NET when not testing.
/// </summary>
public class DateTimeOffsetNowTimeProvider : ITimeProvider, IDateTimeOffsetNowTimeProvider
[Obsolete("This class will be removed in Akka.NET v1.6.0 - use the IScheduler instead.")]
public class DateTimeOffsetNowTimeProvider : IDateTimeOffsetNowTimeProvider
{
private DateTimeOffsetNowTimeProvider() { }
/// <summary>
/// TBD
/// </summary>

public DateTimeOffset Now { get { return DateTimeOffset.UtcNow; } }

/// <summary>
/// TBD
/// </summary>

public TimeSpan MonotonicClock {get { return Util.MonotonicClock.Elapsed; }}

/// <summary>
/// TBD
/// </summary>

public TimeSpan HighResMonotonicClock{get { return Util.MonotonicClock.ElapsedHighRes; }}

/// <summary>
/// TBD
/// </summary>

public static DateTimeOffsetNowTimeProvider Instance { get; } = new();
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/core/Akka/Actor/Scheduler/ITimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
namespace Akka.Actor
{
/// <summary>
/// TBD
/// Time provider used by the scheduler to obtain the current time.
/// </summary>
/// <remarks>
/// Intended to be customizable to we can virtualize time for testing purposes.
///
/// In the future we will drop this in favor of the time provider built into .NET 8 and later.
/// </remarks>
public interface ITimeProvider
{
/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ProducerController(string producerId,
ProducerId = producerId;
Settings = settings ?? ProducerController.Settings.Create(Context.System);
_durableProducerQueueProps = durableProducerQueue;
_timeProvider = timeProvider ?? DateTimeOffsetNowTimeProvider.Instance;
_timeProvider = timeProvider ?? Context.System.Scheduler;
_fuzzingControl = fuzzingControl;

// this state gets overridden during the loading sequence, so it's not used at all really
Expand Down Expand Up @@ -86,7 +86,7 @@ public ProducerController(string producerId,
ProducerId = producerId;
Settings = settings ?? ProducerController.Settings.Create(Context.System);
_durableProducerQueueProps = durableProducerQueue;
_timeProvider = timeProvider ?? DateTimeOffsetNowTimeProvider.Instance;
_timeProvider = timeProvider ?? Context.System.Scheduler;
_fuzzingControl = fuzzingControl;

// this state gets overridden during the loading sequence, so it's not used at all really
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Delivery/ProducerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ private static Props ProducerControllerProps<T>(ActorSystem actorSystem, string
{
if (sendAdapter == null)
return Props.Create(() => new ProducerController<T>(producerId, durableProducerQueue, settings,
DateTimeOffsetNowTimeProvider.Instance, fuzzing));
actorSystem.Scheduler, fuzzing));
return Props.Create(() => new ProducerController<T>(producerId, durableProducerQueue, sendAdapter, settings,
DateTimeOffsetNowTimeProvider.Instance, fuzzing));
actorSystem.Scheduler, fuzzing));
}

public sealed record Settings
Expand Down

0 comments on commit 4feeda3

Please sign in to comment.