diff --git a/build/common.props b/build/common.props
index b35fbea512..919c99c4ac 100644
--- a/build/common.props
+++ b/build/common.props
@@ -5,7 +5,7 @@
latest
4
28
- 2
+ 3
0
diff --git a/src/WebJobs.Script/Diagnostics/FileLogger.cs b/src/WebJobs.Script/Diagnostics/FileLogger.cs
index 12a42acc53..b74fee74a6 100644
--- a/src/WebJobs.Script/Diagnostics/FileLogger.cs
+++ b/src/WebJobs.Script/Diagnostics/FileLogger.cs
@@ -93,7 +93,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except
{
_fileWriter.AppendLine(formattedMessage);
}
- catch (Exception ex) when (!ex.IsFatal())
+ catch (Exception)
{
// Make sure the Logger doesn't throw if there are Exceptions (disk full, etc).
}
diff --git a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs
index b4c820ecd7..e8de96d00c 100644
--- a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs
+++ b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs
@@ -187,7 +187,7 @@ internal async void ShutdownWebhostLanguageWorkerChannels()
await _webHostLanguageWorkerChannelManager?.ShutdownChannelsAsync();
}
- private void SetDispatcherStateToInitialized(Dictionary> webhostLanguageWorkerChannel = null)
+ private void SetDispatcherStateToInitialized(IDictionary> webhostLanguageWorkerChannel = null)
{
// RanToCompletion indicates successful process startup
if (State != FunctionInvocationDispatcherState.Initialized
@@ -198,7 +198,7 @@ private void SetDispatcherStateToInitialized(Dictionary, Task> startAction, bool initializeDispatcher = false, Dictionary> webhostLanguageWorkerChannel = null, IEnumerable functionLanguages = null)
+ private void StartWorkerProcesses(int startIndex, Func, Task> startAction, bool initializeDispatcher = false, IDictionary> webhostLanguageWorkerChannel = null, IEnumerable functionLanguages = null)
{
Task.Run(async () =>
{
@@ -309,7 +309,7 @@ public async Task InitializeAsync(IEnumerable functions, Cance
if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs) || _environment.IsMultiLanguageRuntimeEnvironment())
{
State = FunctionInvocationDispatcherState.Initializing;
- Dictionary> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
+ IDictionary> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime);
if (webhostLanguageWorkerChannels != null)
{
int workerProcessCount = 0;
diff --git a/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs b/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs
index e8969ef218..9df04d3d62 100644
--- a/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs
+++ b/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs
@@ -11,7 +11,7 @@ public interface IWebHostRpcWorkerChannelManager
{
Task InitializeChannelAsync(IEnumerable workerConfigs, string language);
- Dictionary> GetChannels(string language);
+ IDictionary> GetChannels(string language);
Task SpecializeAsync();
diff --git a/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs b/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs
index ef2dae92f5..34ba5b8dc1 100644
--- a/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs
+++ b/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs
@@ -7,9 +7,7 @@
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
-using Microsoft.Azure.AppService.Proxy.Common.Extensions;
using Microsoft.Azure.AppService.Proxy.Common.Infra;
-using Microsoft.Azure.AppService.Proxy.Runtime;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
@@ -36,7 +34,7 @@ public class WebHostRpcWorkerChannelManager : IWebHostRpcWorkerChannelManager
private Action _shutdownStandbyWorkerChannels;
private IConfiguration _config;
- private ConcurrentDictionary>> _workerChannels = new ConcurrentDictionary>>(StringComparer.OrdinalIgnoreCase);
+ private ConcurrentDictionary>> _workerChannels = new(StringComparer.OrdinalIgnoreCase);
public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager,
IEnvironment environment,
@@ -101,7 +99,7 @@ await rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(processStartTask =
internal Task GetChannelAsync(string language)
{
- if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary> workerChannels))
+ if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary> workerChannels))
{
if (workerChannels.Count > 0 && workerChannels.TryGetValue(workerChannels.Keys.First(), out TaskCompletionSource valueTask))
{
@@ -111,9 +109,9 @@ internal Task GetChannelAsync(string language)
return Task.FromResult(null);
}
- public Dictionary> GetChannels(string language)
+ public IDictionary> GetChannels(string language)
{
- if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary> workerChannels))
+ if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary> workerChannels))
{
return workerChannels;
}
@@ -237,7 +235,7 @@ public Task ShutdownChannelIfExistsAsync(string language, string workerId,
if (_hostingConfigOptions.Value.RevertWorkerShutdownBehaviour)
{
- if (_workerChannels.TryRemove(language, out Dictionary> rpcWorkerChannels))
+ if (_workerChannels.TryRemove(language, out ConcurrentDictionary> rpcWorkerChannels))
{
if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource value))
{
@@ -264,7 +262,7 @@ public Task ShutdownChannelIfExistsAsync(string language, string workerId,
}
else
{
- if (_workerChannels.TryGetValue(language, out Dictionary> rpcWorkerChannels)
+ if (_workerChannels.TryGetValue(language, out ConcurrentDictionary> rpcWorkerChannels)
&& rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource value))
{
value?.Task.ContinueWith(channelTask =>
@@ -304,7 +302,7 @@ internal void ScheduleShutdownStandbyChannels()
using (_metricsLogger.LatencyEvent(string.Format(MetricEventNames.SpecializationShutdownStandbyChannels, runtime.Key)))
{
_logger.LogInformation("Disposing standby channel for runtime:{language}", runtime.Key);
- if (_workerChannels.TryRemove(runtime.Key, out Dictionary> standbyChannels))
+ if (_workerChannels.TryRemove(runtime.Key, out ConcurrentDictionary> standbyChannels))
{
foreach (string workerId in standbyChannels.Keys)
{
@@ -338,7 +336,7 @@ public async Task ShutdownChannelsAsync()
foreach (string runtime in _workerChannels.Keys)
{
_logger.LogInformation("Shutting down language worker channels for runtime:{runtime}", runtime);
- if (_workerChannels.TryRemove(runtime, out Dictionary> standbyChannels))
+ if (_workerChannels.TryRemove(runtime, out ConcurrentDictionary> standbyChannels))
{
foreach (string workerId in standbyChannels.Keys)
{
@@ -378,13 +376,13 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha
_workerChannels.AddOrUpdate(initializedRuntime,
(runtime) =>
{
- Dictionary> newLanguageWorkerChannels = new Dictionary>();
- newLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource());
+ ConcurrentDictionary> newLanguageWorkerChannels = new(StringComparer.OrdinalIgnoreCase);
+ newLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource());
return newLanguageWorkerChannels;
},
(runtime, existingLanguageWorkerChannels) =>
{
- existingLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource());
+ existingLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource());
return existingLanguageWorkerChannels;
});
}
@@ -392,7 +390,7 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha
internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel)
{
_logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
- if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary> channel))
+ if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary> channel))
{
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource value))
{
@@ -404,7 +402,7 @@ internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerC
internal void SetExceptionOnInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel, Exception exception)
{
_logger.LogDebug("Failed to initialize webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id);
- if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary> channel))
+ if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary> channel))
{
if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource value))
{
diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs b/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs
index c7f87df462..fe3a3187e9 100644
--- a/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs
+++ b/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs
@@ -35,7 +35,7 @@ public IRpcWorkerChannel GetChannel(string language)
throw new System.NotImplementedException();
}
- public Dictionary> GetChannels(string language)
+ public IDictionary> GetChannels(string language)
{
if (_workerChannels.TryGetValue(language, out Dictionary> workerChannels))
{
diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs
index 2ee6adbe38..4382ca4867 100644
--- a/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs
+++ b/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs
@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Description;
@@ -407,6 +408,38 @@ public async Task ShutdownChannelsIfExist_Succeeds()
Assert.Null(initializedChannel);
}
+ [Fact]
+ public void ShutdownChannelsIfExist_Race_Succeeds()
+ {
+ var channel = CreateTestChannel(RpcWorkerConstants.JavaLanguageWorkerName);
+ string id = channel.Id;
+
+ List> tasks = new();
+ List threads = new();
+ for (int i = 0; i < 2; i++)
+ {
+ Thread t = new(static (state) =>
+ {
+ var (channelManager, tasks, id) = ((WebHostRpcWorkerChannelManager, List>, string))state;
+ tasks.Add(channelManager.ShutdownChannelIfExistsAsync(RpcWorkerConstants.JavaLanguageWorkerName, id));
+ });
+ threads.Add(t);
+ }
+
+ foreach (Thread t in threads)
+ {
+ t.Start((_rpcWorkerChannelManager, tasks, id));
+ }
+
+ foreach (Thread t in threads)
+ {
+ t.Join();
+ }
+
+ // only one should successfully shut down
+ Assert.Single(tasks, t => t.Result == true);
+ }
+
[Fact]
public async Task ShutdownChannelsIfExistsAsync_StopsWorkerInvocations()
{