diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/BlobLogListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/BlobLogListener.cs index 0a6ae0489..1ce2c204d 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/BlobLogListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/BlobLogListener.cs @@ -25,16 +25,14 @@ internal class BlobLogListener private readonly HashSet _scannedBlobNames = new HashSet(); private readonly StorageAnalyticsLogParser _parser; private readonly IWebJobsExceptionHandler _exceptionHandler; + private readonly ILogger _logger; private BlobLogListener(CloudBlobClient blobClient, IWebJobsExceptionHandler exceptionHandler, ILogger logger) { _blobClient = blobClient; _exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); - if (logger == null) - { - throw new ArgumentNullException(nameof(logger)); - } + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _parser = new StorageAnalyticsLogParser(logger); } @@ -57,7 +55,7 @@ public async Task> GetRecentBlobWritesAsync(Cancellation List blobs = new List(); var time = DateTime.UtcNow; // will scan back 2 hours, which is enough to deal with clock sqew - foreach (var blob in await ListRecentLogFilesAsync(_blobClient, time, cancellationToken, hoursWindow, _exceptionHandler)) + foreach (var blob in await ListRecentLogFilesAsync(_blobClient, time, cancellationToken, hoursWindow, _exceptionHandler, _logger)) { bool isAdded = _scannedBlobNames.Add(blob.Name); if (!isAdded) @@ -144,8 +142,8 @@ private static string GetSearchPrefix(string service, DateTime startTime, DateTi // This lets us use prefix scans. $logs/Blob/YYYY/MM/DD/HH00/nnnnnn.log // Logs are about 6 an hour, so we're only scanning about 12 logs total. // $$$ If logs are large, we can even have a cache of "already scanned" logs that we skip. - private static async Task> ListRecentLogFilesAsync(CloudBlobClient blobClient, - DateTime startTimeForSearch, CancellationToken cancellationToken, int hoursWindow, IWebJobsExceptionHandler exceptionHandler) + private static async Task> ListRecentLogFilesAsync(CloudBlobClient blobClient, DateTime startTimeForSearch, + CancellationToken cancellationToken, int hoursWindow, IWebJobsExceptionHandler exceptionHandler, ILogger logger) { string serviceName = "blob"; @@ -155,7 +153,7 @@ private static async Task> ListRecentLogFilesAsync(CloudBlobCli for (int i = 0; i < hoursWindow; i++) { var prefix = GetSearchPrefix(serviceName, lastHour, lastHour); - await GetLogsWithPrefixAsync(selectedLogs, blobClient, prefix, exceptionHandler, cancellationToken); + await GetLogsWithPrefixAsync(selectedLogs, blobClient, prefix, exceptionHandler, logger, cancellationToken); lastHour = lastHour.AddHours(-1); } @@ -165,12 +163,12 @@ private static async Task> ListRecentLogFilesAsync(CloudBlobCli // Populate the List<> with blob logs for the given prefix. // http://blogs.msdn.com/b/windowsazurestorage/archive/2011/08/03/windows-azure-storage-logging-using-logs-to-track-storage-requests.aspx private static async Task GetLogsWithPrefixAsync(List selectedLogs, CloudBlobClient blobClient, - string prefix, IWebJobsExceptionHandler exceptionHandler, CancellationToken cancellationToken) + string prefix, IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken) { // List the blobs using the prefix IEnumerable blobs = await blobClient.ListBlobsAsync(prefix, useFlatBlobListing: true, blobListingDetails: BlobListingDetails.Metadata, - operationName: "ScanLogs", exceptionHandler: exceptionHandler, cancellationToken: cancellationToken); + operationName: "ScanLogs", exceptionHandler: exceptionHandler, logger: logger, cancellationToken: cancellationToken); if (blobs == null) { diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/CloudBlobClientExtensions.cs b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/CloudBlobClientExtensions.cs index ecd017b51..7bc469b03 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/CloudBlobClientExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/CloudBlobClientExtensions.cs @@ -5,10 +5,11 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using Microsoft.Azure.WebJobs.Extensions.Storage; -using Microsoft.Azure.WebJobs.Host.Timers; using Microsoft.Azure.Storage; using Microsoft.Azure.Storage.Blob; +using Microsoft.Azure.WebJobs.Extensions.Storage; +using Microsoft.Azure.WebJobs.Host.Timers; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners { @@ -16,7 +17,7 @@ internal static class CloudBlobClientExtensions { public static async Task> ListBlobsAsync(this CloudBlobClient client, string prefix, bool useFlatBlobListing, BlobListingDetails blobListingDetails, string operationName, - IWebJobsExceptionHandler exceptionHandler, CancellationToken cancellationToken) + IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken) { if (client == null) { @@ -30,10 +31,11 @@ public static async Task> ListBlobsAsync(this CloudBl do { OperationContext context = new OperationContext { ClientRequestID = Guid.NewGuid().ToString() }; - result = await TimeoutHandler.ExecuteWithTimeout(operationName, context.ClientRequestID, exceptionHandler, () => + result = await TimeoutHandler.ExecuteWithTimeout(operationName, context.ClientRequestID, exceptionHandler, + logger, cancellationToken, () => { - return client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails, - maxResults: null, currentToken: continuationToken, options: null, operationContext: context); + return client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails, maxResults: null, + currentToken: continuationToken, options: null, operationContext: context, cancellationToken: cancellationToken); }); if (result != null) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/ScanBlobScanLogHybridPollingStrategy.cs b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/ScanBlobScanLogHybridPollingStrategy.cs index 35fd753ce..81a9992f0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/ScanBlobScanLogHybridPollingStrategy.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Blobs/Listeners/ScanBlobScanLogHybridPollingStrategy.cs @@ -8,12 +8,12 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Azure.Storage; +using Microsoft.Azure.Storage.Blob; using Microsoft.Azure.WebJobs.Extensions.Storage; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Timers; -using Microsoft.Azure.Storage; -using Microsoft.Azure.Storage.Blob; using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners @@ -33,7 +33,7 @@ internal sealed partial class ScanBlobScanLogHybridPollingStrategy : IBlobListen private int _scanBlobLimitPerPoll = 10000; private PollLogsStrategy _pollLogStrategy; private bool _disposed; - + public ScanBlobScanLogHybridPollingStrategy(IBlobScanInfoManager blobScanInfoManager, IWebJobsExceptionHandler exceptionHandler, ILogger logger) : base() { _blobScanInfoManager = blobScanInfoManager; @@ -221,12 +221,13 @@ public async Task> PollNewBlobsAsync( try { OperationContext operationContext = new OperationContext { ClientRequestID = clientRequestId }; - blobSegment = await TimeoutHandler.ExecuteWithTimeout("ScanContainer", operationContext.ClientRequestID, _exceptionHandler, () => - { - return container.ListBlobsSegmentedAsync(prefix: null, useFlatBlobListing: true, - blobListingDetails: BlobListingDetails.None, maxResults: blobPollLimitPerContainer, currentToken: continuationToken, - options: null, operationContext: operationContext, cancellationToken: cancellationToken); - }); + blobSegment = await TimeoutHandler.ExecuteWithTimeout("ScanContainer", operationContext.ClientRequestID, _exceptionHandler, + _logger, cancellationToken, () => + { + return container.ListBlobsSegmentedAsync(prefix: null, useFlatBlobListing: true, + blobListingDetails: BlobListingDetails.None, maxResults: blobPollLimitPerContainer, currentToken: continuationToken, + options: null, operationContext: operationContext, cancellationToken: cancellationToken); + }); if (blobSegment == null) { diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Queues/Listeners/QueueListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Queues/Listeners/QueueListener.cs index 55d7f4609..5466aac2f 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/Queues/Listeners/QueueListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/Queues/Listeners/QueueListener.cs @@ -8,6 +8,8 @@ using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; +using Microsoft.Azure.Storage; +using Microsoft.Azure.Storage.Queue; using Microsoft.Azure.WebJobs.Extensions.Storage; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; @@ -15,8 +17,6 @@ using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.Timers; using Microsoft.Extensions.Logging; -using Microsoft.Azure.Storage; -using Microsoft.Azure.Storage.Queue; namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners { @@ -196,14 +196,15 @@ public async Task ExecuteAsync(CancellationToken cancel sw = Stopwatch.StartNew(); OperationContext context = new OperationContext { ClientRequestID = clientRequestId }; - batch = await TimeoutHandler.ExecuteWithTimeout("GetMessages", context.ClientRequestID, _exceptionHandler, () => - { - return _queue.GetMessagesAsync(_queueProcessor.BatchSize, - _visibilityTimeout, - options: null, - operationContext: context, - cancellationToken: cancellationToken); - }); + batch = await TimeoutHandler.ExecuteWithTimeout(nameof(CloudQueue.GetMessageAsync), context.ClientRequestID, + _exceptionHandler, _logger, cancellationToken, () => + { + return _queue.GetMessagesAsync(_queueProcessor.BatchSize, + _visibilityTimeout, + options: null, + operationContext: context, + cancellationToken: cancellationToken); + }); int count = batch?.Count() ?? -1; Logger.GetMessages(_logger, _functionDescriptor.LogName, _queue.Name, context.ClientRequestID, count, sw.ElapsedMilliseconds); diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/TimeoutHandler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Storage/TimeoutHandler.cs index 5c80912d2..dc7f26f9b 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/TimeoutHandler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/TimeoutHandler.cs @@ -2,21 +2,27 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; +using Microsoft.Azure.Storage; using Microsoft.Azure.WebJobs.Host.Timers; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.Extensions.Storage { internal static class TimeoutHandler { - private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(2); + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(3); - public static async Task ExecuteWithTimeout(string operationName, string clientRequestId, IWebJobsExceptionHandler exceptionHandler, Func> operation) + public static async Task ExecuteWithTimeout(string operationName, string clientRequestId, + IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken, Func> operation) { using (var cts = new CancellationTokenSource()) { + Stopwatch sw = Stopwatch.StartNew(); + Task timeoutTask = Task.Delay(DefaultTimeout, cts.Token); Task operationTask = operation(); @@ -42,7 +48,30 @@ public static async Task ExecuteWithTimeout(string operationName, string c // Cancel the Delay. cts.Cancel(); - return await operationTask; + try + { + // Determine if this was a deadlock. If so, log it and rethrow. Use the passed-in cancellationToken + // to detemrine of this operation was canceled explicitly or was due to an internal network timeout. + + return await operationTask; + } + catch (StorageException ex) when (ex.InnerException is OperationCanceledException && !cancellationToken.IsCancellationRequested) // network timeout + { + Logger.StorageTimeout(logger, operationName, clientRequestId, sw.ElapsedMilliseconds); + throw; + } + } + } + + private static class Logger + { + private static readonly Action _storageDeadlock = + LoggerMessage.Define(Microsoft.Extensions.Logging.LogLevel.Debug, new EventId(600, nameof(StorageTimeout)), + "The operation '{operationName}' with id '{clientRequestId}' was canceled due to a network timeout after '{elapsed}' ms."); + + internal static void StorageTimeout(ILogger logger, string operationName, string clientRequestId, long elapsedMilliseconds) + { + _storageDeadlock(logger, operationName, clientRequestId, elapsedMilliseconds, null); } } } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Storage/WebJobs.Extensions.Storage.csproj b/src/Microsoft.Azure.WebJobs.Extensions.Storage/WebJobs.Extensions.Storage.csproj index 998226efe..38d1ce240 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Storage/WebJobs.Extensions.Storage.csproj +++ b/src/Microsoft.Azure.WebJobs.Extensions.Storage/WebJobs.Extensions.Storage.csproj @@ -93,9 +93,9 @@ - - - + + + all diff --git a/src/Microsoft.Azure.WebJobs.Host.Storage/WebJobs.Host.Storage.csproj b/src/Microsoft.Azure.WebJobs.Host.Storage/WebJobs.Host.Storage.csproj index 2f667b3cc..3f215c2e2 100644 --- a/src/Microsoft.Azure.WebJobs.Host.Storage/WebJobs.Host.Storage.csproj +++ b/src/Microsoft.Azure.WebJobs.Host.Storage/WebJobs.Host.Storage.csproj @@ -31,7 +31,7 @@ - + all diff --git a/src/Microsoft.Azure.WebJobs.Host/Timers/TaskSeriesTimer.cs b/src/Microsoft.Azure.WebJobs.Host/Timers/TaskSeriesTimer.cs index 6282b8319..a10228f68 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Timers/TaskSeriesTimer.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Timers/TaskSeriesTimer.cs @@ -147,9 +147,9 @@ private async Task RunAsync(CancellationToken cancellationToken) TaskSeriesCommandResult result = await _command.ExecuteAsync(cancellationToken); wait = result.Wait; } - catch (Exception ex) when (ex.InnerException is TaskCanceledException) + catch (Exception ex) when (ex.InnerException is OperationCanceledException) { - // TaskCanceledExceptions coming from storage are wrapped in a StorageException. + // OperationCanceledExceptions coming from storage are wrapped in a StorageException. // We'll handle them all here so they don't have to be managed for every call. } catch (OperationCanceledException) diff --git a/src/Microsoft.Azure.WebJobs.Logging/WebJobs.Logging.csproj b/src/Microsoft.Azure.WebJobs.Logging/WebJobs.Logging.csproj index b0b6965f6..c13354d87 100644 --- a/src/Microsoft.Azure.WebJobs.Logging/WebJobs.Logging.csproj +++ b/src/Microsoft.Azure.WebJobs.Logging/WebJobs.Logging.csproj @@ -25,7 +25,7 @@ - + all diff --git a/test/FakeStorage/FakeAzureStorage.csproj b/test/FakeStorage/FakeAzureStorage.csproj index e1ad4195b..7b72b4f29 100644 --- a/test/FakeStorage/FakeAzureStorage.csproj +++ b/test/FakeStorage/FakeAzureStorage.csproj @@ -8,9 +8,9 @@ - - - + + + diff --git a/test/Microsoft.Azure.WebJobs.Host.TestCommon/WebJobs.Host.TestCommon.csproj b/test/Microsoft.Azure.WebJobs.Host.TestCommon/WebJobs.Host.TestCommon.csproj index 8853aa14b..1c71af5de 100644 --- a/test/Microsoft.Azure.WebJobs.Host.TestCommon/WebJobs.Host.TestCommon.csproj +++ b/test/Microsoft.Azure.WebJobs.Host.TestCommon/WebJobs.Host.TestCommon.csproj @@ -22,8 +22,8 @@ - - + + all diff --git a/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/Blobs/Listeners/StorageBlobClientExtensionsTests.cs b/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/Blobs/Listeners/StorageBlobClientExtensionsTests.cs index 02196a684..daa3489f8 100644 --- a/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/Blobs/Listeners/StorageBlobClientExtensionsTests.cs +++ b/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/Blobs/Listeners/StorageBlobClientExtensionsTests.cs @@ -6,14 +6,14 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Azure.WebJobs.Host.Blobs.Listeners; -using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.Storage; using Microsoft.Azure.Storage.Blob; +using Microsoft.Azure.WebJobs.Host.Blobs.Listeners; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Logging.Abstractions; using Moq; using Xunit; - namespace Microsoft.Azure.WebJobs.Host.UnitTests.Blobs.Listeners { public class StorageBlobClientExtensionsTests @@ -61,9 +61,9 @@ public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount) // Mock the List function to return the correct blob page HashSet seenTokens = new HashSet(); BlobResultSegment resultSegment = null; - mockClient.Setup(p => p.ListBlobsSegmentedAsync("test", true, BlobListingDetails.None, null, It.IsAny(), null, It.IsAny())) - .Callback( - (prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext) => + mockClient.Setup(p => p.ListBlobsSegmentedAsync("test", true, BlobListingDetails.None, null, It.IsAny(), null, It.IsAny(), It.IsAny())) + .Callback( + (prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext, cancellationToken) => { // Previously this is where a bug existed - ListBlobsAsync wasn't handling // continuation tokens properly and kept sending the same initial token @@ -82,7 +82,7 @@ public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount) }) .Returns(() => { return Task.FromResult(resultSegment); }); - IEnumerable results = await mockClient.Object.ListBlobsAsync("test", true, BlobListingDetails.None, "test", new TestExceptionHandler(), CancellationToken.None); + IEnumerable results = await mockClient.Object.ListBlobsAsync("test", true, BlobListingDetails.None, "test", new TestExceptionHandler(), NullLogger.Instance, CancellationToken.None); Assert.Equal(blobCount, results.Count()); } diff --git a/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/WebJobs.Extensions.Storage.UnitTests.csproj b/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/WebJobs.Extensions.Storage.UnitTests.csproj index 7a4f47e0f..90774d74e 100644 --- a/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/WebJobs.Extensions.Storage.UnitTests.csproj +++ b/test/Microsoft.Azure.Webjobs.Extensions.Storage.UnitTests/WebJobs.Extensions.Storage.UnitTests.csproj @@ -22,9 +22,9 @@ - - - + + + all