Skip to content

Commit

Permalink
NacosConfigurationHttpClient现在将会复用相同的配置订阅任务,修正Grpc客户端多重订阅时的错误取消;
Browse files Browse the repository at this point in the history
  • Loading branch information
stratosblue committed Jul 18, 2021
1 parent 980ef90 commit 4bcba80
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
],
"Auth": { //认证信息节点
"User": { //用于Nacos登陆的用户信息
"Account": "username",
"Password": "password"
"Account": "nacos",
"Password": "nacos"
},
"ACS": { //用于阿里云ACS认证信息
"RegionId": "",
Expand All @@ -31,7 +31,7 @@
}
]
},
"HealthCheckInterval": "00:00:05", //可选,客户端健康检查间隔
"ClientIPSubnet": "192.168.1.1/24" //可选,获取客户端IP时,将获取此值所在子网IP
"HealthCheckInterval": "00:00:05" //可选,客户端健康检查间隔
//"ClientIPSubnet": "192.168.1.1/24" //可选,获取客户端IP时,将获取此值所在子网IP
//"SpecifyClientIP": "127.0.0.1" //可选,指定客户端IP,优先级高于ClientIPSubnet
}
11 changes: 6 additions & 5 deletions src/Nacos.Grpc/NacosConfigurationGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,13 @@ private async Task UnSubscribeConfigurationChange(NacosConfigurationDescriptor d

Logger?.LogInformation("取消配置 {0} 的变更通知订阅", configurationUniqueKey);

_subscriptions.RemoveSubscribe(descriptor, notifyCallback);

var request = new ConfigBatchListenRequest(false).AddListenContext(descriptor);
if (_subscriptions.RemoveSubscribe(descriptor, notifyCallback))
{
var request = new ConfigBatchListenRequest(false).AddListenContext(descriptor);

//HACK 此处不做检查。。。
await RequestAsync(request, RunningToken).ConfigureAwait(false);
//HACK 此处不做检查。。。
await RequestAsync(request, RunningToken).ConfigureAwait(false);
}
}

#endregion Private 方法
Expand Down
10 changes: 7 additions & 3 deletions src/Nacos/ConfigurationSubscriptionCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ public sealed class ConfigurationSubscriptionCollection : IDisposable
/// </summary>
/// <param name="descriptor"></param>
/// <param name="notifyCallback"></param>
public void AddSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback)
/// <returns>是否为新建</returns>
public bool AddSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback)
{
lock (_subscribeStates)
{
if (_subscribeStates.TryGetValue(descriptor, out var existSubscribeState))
{
existSubscribeState.NotifyCallback += notifyCallback;
return false;
}
else
{
_subscribeStates.Add(descriptor, new(descriptor, notifyCallback));
return true;
}
}
}
Expand Down Expand Up @@ -64,7 +67,7 @@ public ConfigurationSubscribeState[] GetAllSubscription()
/// </summary>
/// <param name="descriptor"></param>
/// <param name="notifyCallback"></param>
/// <returns></returns>
/// <returns>是否移除了所有订阅</returns>
public bool RemoveSubscribe(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback)
{
lock (_subscribeStates)
Expand All @@ -76,7 +79,8 @@ public bool RemoveSubscribe(NacosConfigurationDescriptor descriptor, Configurati

if (existNotifyCallback is null)
{
return _subscribeStates.Remove(descriptor);
_subscribeStates.Remove(descriptor);
return true;
}
else
{
Expand Down
28 changes: 23 additions & 5 deletions src/Nacos/Http/HttpConfigurationChangeUnsubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,25 @@ internal class HttpConfigurationChangeUnsubscriber : IAsyncDisposable
{
#region Private 字段

private CancellationTokenSource? _tokenSource;
private NacosConfigurationDescriptor _descriptor;

private int _isDisposed = 0;

private ConfigurationChangeNotifyCallback _notifyCallback;

private Action<NacosConfigurationDescriptor, ConfigurationChangeNotifyCallback> _unSubscriberAction;

#endregion Private 字段

#region Public 构造函数

public HttpConfigurationChangeUnsubscriber(CancellationTokenSource tokenSource)
public HttpConfigurationChangeUnsubscriber(NacosConfigurationDescriptor descriptor,
ConfigurationChangeNotifyCallback notifyCallback,
Action<NacosConfigurationDescriptor, ConfigurationChangeNotifyCallback> unSubscriberAction)
{
_tokenSource = tokenSource ?? throw new ArgumentNullException(nameof(tokenSource));
_descriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
_notifyCallback = notifyCallback ?? throw new ArgumentNullException(nameof(notifyCallback));
_unSubscriberAction = unSubscriberAction ?? throw new ArgumentNullException(nameof(unSubscriberAction));
}

#endregion Public 构造函数
Expand All @@ -28,9 +38,17 @@ public HttpConfigurationChangeUnsubscriber(CancellationTokenSource tokenSource)

public ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _tokenSource, null!) is CancellationTokenSource tokenSource)
if (Interlocked.Increment(ref _isDisposed) == 1)
{
tokenSource.SilenceRelease();
var descriptor = _descriptor;
var notifyCallback = _notifyCallback;
var unSubscriberAction = _unSubscriberAction;

_descriptor = null!;
_notifyCallback = null!;
_unSubscriberAction = null!;

unSubscriberAction(descriptor, notifyCallback);
}
return ValueTask.CompletedTask;
}
Expand Down
115 changes: 108 additions & 7 deletions src/Nacos/Http/NacosConfigurationHttpClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -21,6 +23,10 @@ public sealed class NacosConfigurationHttpClient : NacosHttpClient, INacosConfig

private readonly Func<ConfigurationGetContext, Task<NacosConfigurationDescriptor>> _getConfigurationDelegate;

private readonly Dictionary<INacosUniqueConfiguration, CancellationTokenSource> _subscribeTokenSources = new(new INacosUniqueConfigurationEqualityComparer());

private readonly ConfigurationSubscriptionCollection _subscriptions = new();

#endregion Private 字段

#region Public 构造函数
Expand Down Expand Up @@ -50,14 +56,31 @@ public Task<IAsyncDisposable> SubscribeConfigurationChangeAsync(NacosConfigurati
ConfigurationChangeNotifyCallback notifyCallback,
CancellationToken token = default)
{
var pollingTokenSource = CancellationTokenSource.CreateLinkedTokenSource(RunningToken);
pollingTokenSource.Token.Register(() => pollingTokenSource.Dispose());
_subscriptions.AddSubscribe(descriptor, notifyCallback);

_ = PollingListeningConfigurationAsync(descriptor, notifyCallback, pollingTokenSource.Token);
CancellationTokenSource? subscribeTokenSource = null;

var unsubscriber = new HttpConfigurationChangeUnsubscriber(pollingTokenSource);
lock (_subscribeTokenSources)
{
if (_subscribeTokenSources.TryGetValue(descriptor, out subscribeTokenSource))
{
return Task.FromResult<IAsyncDisposable>(CreateUnsubscriber());
}
else
{
subscribeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(RunningToken);
_subscribeTokenSources.Add(descriptor, subscribeTokenSource);
}
}

_ = PollingListeningConfigurationAsync(descriptor, subscribeTokenSource.Token);

return Task.FromResult<IAsyncDisposable>(unsubscriber);
return Task.FromResult<IAsyncDisposable>(CreateUnsubscriber());

HttpConfigurationChangeUnsubscriber CreateUnsubscriber()
{
return new HttpConfigurationChangeUnsubscriber(descriptor, notifyCallback, UnSubscribeConfigurationChange);
}
}

#endregion Public 方法
Expand All @@ -67,6 +90,21 @@ public Task<IAsyncDisposable> SubscribeConfigurationChangeAsync(NacosConfigurati
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
_subscriptions.Dispose();

CancellationTokenSource[] ctss;

lock (_subscribeTokenSources)
{
ctss = _subscribeTokenSources.Values.ToArray();
_subscribeTokenSources.Clear();
}

foreach (var item in ctss)
{
item.SilenceRelease();
}

base.Dispose(disposing);
}

Expand All @@ -88,7 +126,7 @@ private async Task<NacosConfigurationDescriptor> InternalGetConfigurationAsync(N
}
}

private async Task PollingListeningConfigurationAsync(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback, CancellationToken token)
private async Task PollingListeningConfigurationAsync(NacosConfigurationDescriptor descriptor, CancellationToken token)
{
var scaler = new Scaler(0, 10, 60);

Expand All @@ -104,10 +142,50 @@ private async Task PollingListeningConfigurationAsync(NacosConfigurationDescript

if (!string.IsNullOrWhiteSpace(response))
{
var hasSubscribe = _subscriptions.TryGetSubscribe(descriptor, out var state);

if (!hasSubscribe)
{
Logger?.LogTrace("没有对配置 {0} 的订阅, 监听任务退出.", descriptor);
return;
}

var notifyCallback = state?.NotifyCallback;

if (notifyCallback is null)
{
//HACK 理论上不应该走到这里面的逻辑

scaler.Add();

Logger?.LogInformation("监听到配置 {0} 有变更, 但没有获取到回调委托, 等待 {1} s 后重试", descriptor, scaler.Value);

await Task.Delay(TimeSpan.FromSeconds(scaler.Value), token).ConfigureAwait(false);

continue;
}

//HACK 响应值是否有内容?
var newDescriptor = await GetConfigurationAsync(descriptor, token).ConfigureAwait(false);

await notifyCallback(newDescriptor, token).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(descriptor.Hash)) //hash为空,认为是第一次订阅,不触发回调
{
//HACK 在此处捕获异常,是否合理
try
{
var tasks = notifyCallback.GetInvocationList()
.Cast<ConfigurationChangeNotifyCallback>()
.Select(callback => callback(newDescriptor, RunningToken))
.ToArray();

await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (Exception ex)
{
//HACK 是否需要异常处理
Logger?.LogError(ex, "配置变更订阅处理异常, 变更信息: {0}", newDescriptor);
}
}

descriptor = descriptor.WithContent(newDescriptor.Content, newDescriptor.Hash ?? HashUtil.ComputeMD5(newDescriptor.Content).ToHexString());
}
Expand Down Expand Up @@ -135,6 +213,29 @@ private async Task PollingListeningConfigurationAsync(NacosConfigurationDescript
}
}

private void UnSubscribeConfigurationChange(NacosConfigurationDescriptor descriptor, ConfigurationChangeNotifyCallback notifyCallback)
{
if (notifyCallback is null)
{
return;
}

var configurationUniqueKey = descriptor.GetUniqueKey();

Logger?.LogInformation("取消配置 {0} 的变更通知订阅", configurationUniqueKey);

if (_subscriptions.RemoveSubscribe(descriptor, notifyCallback))
{
lock (_subscribeTokenSources)
{
if (_subscribeTokenSources.TryGetValue(descriptor, out var cts))
{
cts.SilenceRelease();
}
}
}
}

#endregion Private 方法
}
}
2 changes: 1 addition & 1 deletion src/extensions.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionSuffix>alpha-0012</VersionSuffix>
<VersionSuffix>alpha-0013</VersionSuffix>

<IsPackable>true</IsPackable>

Expand Down

0 comments on commit 4bcba80

Please sign in to comment.