diff --git a/samples/GenericHostSample/appsettings.nacos.configuration.json b/samples/GenericHostSample/appsettings.nacos.configuration.json index 844da0a..5b7cacf 100644 --- a/samples/GenericHostSample/appsettings.nacos.configuration.json +++ b/samples/GenericHostSample/appsettings.nacos.configuration.json @@ -6,8 +6,8 @@ ], "Auth": { //认证信息节点 "User": { //用于Nacos登陆的用户信息 - "Account": "username", - "Password": "password" + "Account": "nacos", + "Password": "nacos" }, "ACS": { //用于阿里云ACS认证信息 "RegionId": "", @@ -31,7 +31,7 @@ } ] }, - "HealthCheckInterval": "00:00:05", //可选,客户端健康检查间隔 - "ClientIPSubnet": "192.168.1.1/24" //可选,获取客户端IP时,将获取此值所在子网IP + "HealthCheckInterval": "00:00:05" //可选,客户端健康检查间隔 + //"ClientIPSubnet": "192.168.1.1/24" //可选,获取客户端IP时,将获取此值所在子网IP //"SpecifyClientIP": "127.0.0.1" //可选,指定客户端IP,优先级高于ClientIPSubnet } \ No newline at end of file diff --git a/src/Nacos.Grpc/NacosConfigurationGrpcClient.cs b/src/Nacos.Grpc/NacosConfigurationGrpcClient.cs index 663a707..50fe1be 100644 --- a/src/Nacos.Grpc/NacosConfigurationGrpcClient.cs +++ b/src/Nacos.Grpc/NacosConfigurationGrpcClient.cs @@ -184,12 +184,13 @@ private async Task UnSubscribeConfigurationChange(NacosConfigurationDescriptor d Logger?.LogInformation("取消配置 {0} 的变更通知订阅", configurationUniqueKey); - _subscriptions.RemoveSubscribe(descriptor, notifyCallback); - - var request = new ConfigBatchListenRequest(false).AddListenContext(descriptor); + if (_subscriptions.RemoveSubscribe(descriptor, notifyCallback)) + { + var request = new ConfigBatchListenRequest(false).AddListenContext(descriptor); - //HACK 此处不做检查。。。 - await RequestAsync(request, RunningToken).ConfigureAwait(false); + //HACK 此处不做检查。。。 + await RequestAsync(request, RunningToken).ConfigureAwait(false); + } } #endregion Private 方法 diff --git a/src/Nacos/ConfigurationSubscriptionCollection.cs b/src/Nacos/ConfigurationSubscriptionCollection.cs index 28f9da3..18c2aad 100644 --- a/src/Nacos/ConfigurationSubscriptionCollection.cs +++ b/src/Nacos/ConfigurationSubscriptionCollection.cs @@ -23,17 +23,20 @@ public sealed class ConfigurationSubscriptionCollection : IDisposable /// /// /// - public void AddSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback) + /// 是否为新建 + public bool AddSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback) { lock (_subscribeStates) { if (_subscribeStates.TryGetValue(descriptor, out var existSubscribeState)) { existSubscribeState.NotifyCallback += notifyCallback; + return false; } else { _subscribeStates.Add(descriptor, new(descriptor, notifyCallback)); + return true; } } } @@ -64,7 +67,7 @@ public ConfigurationSubscribeState[] GetAllSubscription() /// /// /// - /// + /// 是否移除了所有订阅 public bool RemoveSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback) { lock (_subscribeStates) @@ -76,7 +79,8 @@ public bool RemoveSubscribe(NacosConfigurationDescriptor descriptor, Configurati if (existNotifyCallback is null) { - return _subscribeStates.Remove(descriptor); + _subscribeStates.Remove(descriptor); + return true; } else { diff --git a/src/Nacos/Http/HttpConfigurationChangeUnsubscriber.cs b/src/Nacos/Http/HttpConfigurationChangeUnsubscriber.cs index e088d12..5f857f9 100644 --- a/src/Nacos/Http/HttpConfigurationChangeUnsubscriber.cs +++ b/src/Nacos/Http/HttpConfigurationChangeUnsubscriber.cs @@ -11,15 +11,25 @@ internal class HttpConfigurationChangeUnsubscriber : IAsyncDisposable { #region Private 字段 - private CancellationTokenSource? _tokenSource; + private NacosConfigurationDescriptor _descriptor; + + private int _isDisposed = 0; + + private ConfigurationChangeNotifyCallback _notifyCallback; + + private Action _unSubscriberAction; #endregion Private 字段 #region Public 构造函数 - public HttpConfigurationChangeUnsubscriber(CancellationTokenSource tokenSource) + public HttpConfigurationChangeUnsubscriber(NacosConfigurationDescriptor descriptor, + ConfigurationChangeNotifyCallback notifyCallback, + Action unSubscriberAction) { - _tokenSource = tokenSource ?? throw new ArgumentNullException(nameof(tokenSource)); + _descriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor)); + _notifyCallback = notifyCallback ?? throw new ArgumentNullException(nameof(notifyCallback)); + _unSubscriberAction = unSubscriberAction ?? throw new ArgumentNullException(nameof(unSubscriberAction)); } #endregion Public 构造函数 @@ -28,9 +38,17 @@ public HttpConfigurationChangeUnsubscriber(CancellationTokenSource tokenSource) public ValueTask DisposeAsync() { - if (Interlocked.Exchange(ref _tokenSource, null!) is CancellationTokenSource tokenSource) + if (Interlocked.Increment(ref _isDisposed) == 1) { - tokenSource.SilenceRelease(); + var descriptor = _descriptor; + var notifyCallback = _notifyCallback; + var unSubscriberAction = _unSubscriberAction; + + _descriptor = null!; + _notifyCallback = null!; + _unSubscriberAction = null!; + + unSubscriberAction(descriptor, notifyCallback); } return ValueTask.CompletedTask; } diff --git a/src/Nacos/Http/NacosConfigurationHttpClient.cs b/src/Nacos/Http/NacosConfigurationHttpClient.cs index adbd9dc..0d7db46 100644 --- a/src/Nacos/Http/NacosConfigurationHttpClient.cs +++ b/src/Nacos/Http/NacosConfigurationHttpClient.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -21,6 +23,10 @@ public sealed class NacosConfigurationHttpClient : NacosHttpClient, INacosConfig private readonly Func> _getConfigurationDelegate; + private readonly Dictionary _subscribeTokenSources = new(new INacosUniqueConfigurationEqualityComparer()); + + private readonly ConfigurationSubscriptionCollection _subscriptions = new(); + #endregion Private 字段 #region Public 构造函数 @@ -50,14 +56,31 @@ public Task SubscribeConfigurationChangeAsync(NacosConfigurati ConfigurationChangeNotifyCallback notifyCallback, CancellationToken token = default) { - var pollingTokenSource = CancellationTokenSource.CreateLinkedTokenSource(RunningToken); - pollingTokenSource.Token.Register(() => pollingTokenSource.Dispose()); + _subscriptions.AddSubscribe(descriptor, notifyCallback); - _ = PollingListeningConfigurationAsync(descriptor, notifyCallback, pollingTokenSource.Token); + CancellationTokenSource? subscribeTokenSource = null; - var unsubscriber = new HttpConfigurationChangeUnsubscriber(pollingTokenSource); + lock (_subscribeTokenSources) + { + if (_subscribeTokenSources.TryGetValue(descriptor, out subscribeTokenSource)) + { + return Task.FromResult(CreateUnsubscriber()); + } + else + { + subscribeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(RunningToken); + _subscribeTokenSources.Add(descriptor, subscribeTokenSource); + } + } + + _ = PollingListeningConfigurationAsync(descriptor, subscribeTokenSource.Token); - return Task.FromResult(unsubscriber); + return Task.FromResult(CreateUnsubscriber()); + + HttpConfigurationChangeUnsubscriber CreateUnsubscriber() + { + return new HttpConfigurationChangeUnsubscriber(descriptor, notifyCallback, UnSubscribeConfigurationChange); + } } #endregion Public 方法 @@ -67,6 +90,21 @@ public Task SubscribeConfigurationChangeAsync(NacosConfigurati /// protected override void Dispose(bool disposing) { + _subscriptions.Dispose(); + + CancellationTokenSource[] ctss; + + lock (_subscribeTokenSources) + { + ctss = _subscribeTokenSources.Values.ToArray(); + _subscribeTokenSources.Clear(); + } + + foreach (var item in ctss) + { + item.SilenceRelease(); + } + base.Dispose(disposing); } @@ -88,7 +126,7 @@ private async Task InternalGetConfigurationAsync(N } } - private async Task PollingListeningConfigurationAsync(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback, CancellationToken token) + private async Task PollingListeningConfigurationAsync(NacosConfigurationDescriptor descriptor, CancellationToken token) { var scaler = new Scaler(0, 10, 60); @@ -104,10 +142,50 @@ private async Task PollingListeningConfigurationAsync(NacosConfigurationDescript if (!string.IsNullOrWhiteSpace(response)) { + var hasSubscribe = _subscriptions.TryGetSubscribe(descriptor, out var state); + + if (!hasSubscribe) + { + Logger?.LogTrace("没有对配置 {0} 的订阅, 监听任务退出.", descriptor); + return; + } + + var notifyCallback = state?.NotifyCallback; + + if (notifyCallback is null) + { + //HACK 理论上不应该走到这里面的逻辑 + + scaler.Add(); + + Logger?.LogInformation("监听到配置 {0} 有变更, 但没有获取到回调委托, 等待 {1} s 后重试", descriptor, scaler.Value); + + await Task.Delay(TimeSpan.FromSeconds(scaler.Value), token).ConfigureAwait(false); + + continue; + } + //HACK 响应值是否有内容? var newDescriptor = await GetConfigurationAsync(descriptor, token).ConfigureAwait(false); - await notifyCallback(newDescriptor, token).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(descriptor.Hash)) //hash为空,认为是第一次订阅,不触发回调 + { + //HACK 在此处捕获异常,是否合理 + try + { + var tasks = notifyCallback.GetInvocationList() + .Cast() + .Select(callback => callback(newDescriptor, RunningToken)) + .ToArray(); + + await Task.WhenAll(tasks).ConfigureAwait(false); + } + catch (Exception ex) + { + //HACK 是否需要异常处理 + Logger?.LogError(ex, "配置变更订阅处理异常, 变更信息: {0}", newDescriptor); + } + } descriptor = descriptor.WithContent(newDescriptor.Content, newDescriptor.Hash ?? HashUtil.ComputeMD5(newDescriptor.Content).ToHexString()); } @@ -135,6 +213,29 @@ private async Task PollingListeningConfigurationAsync(NacosConfigurationDescript } } + private void UnSubscribeConfigurationChange(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback) + { + if (notifyCallback is null) + { + return; + } + + var configurationUniqueKey = descriptor.GetUniqueKey(); + + Logger?.LogInformation("取消配置 {0} 的变更通知订阅", configurationUniqueKey); + + if (_subscriptions.RemoveSubscribe(descriptor, notifyCallback)) + { + lock (_subscribeTokenSources) + { + if (_subscribeTokenSources.TryGetValue(descriptor, out var cts)) + { + cts.SilenceRelease(); + } + } + } + } + #endregion Private 方法 } } \ No newline at end of file diff --git a/src/extensions.props b/src/extensions.props index d37e171..5949a1e 100644 --- a/src/extensions.props +++ b/src/extensions.props @@ -1,7 +1,7 @@  1.0.0 - alpha-0012 + alpha-0013 true