Skip to content

Commit

Permalink
[Storage] updating to 11.1.4; handling storage network timeouts (#2486)
Browse files Browse the repository at this point in the history
  • Loading branch information
brettsam authored May 4, 2020
1 parent 5e410d7 commit c9d92b2
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ internal class BlobLogListener
private readonly HashSet<string> _scannedBlobNames = new HashSet<string>();
private readonly StorageAnalyticsLogParser _parser;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly ILogger<BlobListener> _logger;

private BlobLogListener(CloudBlobClient blobClient, IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger)
{
_blobClient = blobClient;
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));

if (logger == null)
{
throw new ArgumentNullException(nameof(logger));
}
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_parser = new StorageAnalyticsLogParser(logger);
}

Expand All @@ -57,7 +55,7 @@ public async Task<IEnumerable<ICloudBlob>> GetRecentBlobWritesAsync(Cancellation
List<ICloudBlob> blobs = new List<ICloudBlob>();

var time = DateTime.UtcNow; // will scan back 2 hours, which is enough to deal with clock sqew
foreach (var blob in await ListRecentLogFilesAsync(_blobClient, time, cancellationToken, hoursWindow, _exceptionHandler))
foreach (var blob in await ListRecentLogFilesAsync(_blobClient, time, cancellationToken, hoursWindow, _exceptionHandler, _logger))
{
bool isAdded = _scannedBlobNames.Add(blob.Name);
if (!isAdded)
Expand Down Expand Up @@ -144,8 +142,8 @@ private static string GetSearchPrefix(string service, DateTime startTime, DateTi
// This lets us use prefix scans. $logs/Blob/YYYY/MM/DD/HH00/nnnnnn.log
// Logs are about 6 an hour, so we're only scanning about 12 logs total.
// $$$ If logs are large, we can even have a cache of "already scanned" logs that we skip.
private static async Task<List<ICloudBlob>> ListRecentLogFilesAsync(CloudBlobClient blobClient,
DateTime startTimeForSearch, CancellationToken cancellationToken, int hoursWindow, IWebJobsExceptionHandler exceptionHandler)
private static async Task<List<ICloudBlob>> ListRecentLogFilesAsync(CloudBlobClient blobClient, DateTime startTimeForSearch,
CancellationToken cancellationToken, int hoursWindow, IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger)
{
string serviceName = "blob";

Expand All @@ -155,7 +153,7 @@ private static async Task<List<ICloudBlob>> ListRecentLogFilesAsync(CloudBlobCli
for (int i = 0; i < hoursWindow; i++)
{
var prefix = GetSearchPrefix(serviceName, lastHour, lastHour);
await GetLogsWithPrefixAsync(selectedLogs, blobClient, prefix, exceptionHandler, cancellationToken);
await GetLogsWithPrefixAsync(selectedLogs, blobClient, prefix, exceptionHandler, logger, cancellationToken);
lastHour = lastHour.AddHours(-1);
}

Expand All @@ -165,12 +163,12 @@ private static async Task<List<ICloudBlob>> ListRecentLogFilesAsync(CloudBlobCli
// Populate the List<> with blob logs for the given prefix.
// http://blogs.msdn.com/b/windowsazurestorage/archive/2011/08/03/windows-azure-storage-logging-using-logs-to-track-storage-requests.aspx
private static async Task GetLogsWithPrefixAsync(List<ICloudBlob> selectedLogs, CloudBlobClient blobClient,
string prefix, IWebJobsExceptionHandler exceptionHandler, CancellationToken cancellationToken)
string prefix, IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger, CancellationToken cancellationToken)
{
// List the blobs using the prefix
IEnumerable<IListBlobItem> blobs = await blobClient.ListBlobsAsync(prefix,
useFlatBlobListing: true, blobListingDetails: BlobListingDetails.Metadata,
operationName: "ScanLogs", exceptionHandler: exceptionHandler, cancellationToken: cancellationToken);
operationName: "ScanLogs", exceptionHandler: exceptionHandler, logger: logger, cancellationToken: cancellationToken);

if (blobs == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal static class CloudBlobClientExtensions
{
public static async Task<IEnumerable<IListBlobItem>> ListBlobsAsync(this CloudBlobClient client,
string prefix, bool useFlatBlobListing, BlobListingDetails blobListingDetails, string operationName,
IWebJobsExceptionHandler exceptionHandler, CancellationToken cancellationToken)
IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken)
{
if (client == null)
{
Expand All @@ -30,10 +31,11 @@ public static async Task<IEnumerable<IListBlobItem>> ListBlobsAsync(this CloudBl
do
{
OperationContext context = new OperationContext { ClientRequestID = Guid.NewGuid().ToString() };
result = await TimeoutHandler.ExecuteWithTimeout(operationName, context.ClientRequestID, exceptionHandler, () =>
result = await TimeoutHandler.ExecuteWithTimeout(operationName, context.ClientRequestID, exceptionHandler,
logger, cancellationToken, () =>
{
return client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails,
maxResults: null, currentToken: continuationToken, options: null, operationContext: context);
return client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails, maxResults: null,
currentToken: continuationToken, options: null, operationContext: context, cancellationToken: cancellationToken);
});

if (result != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
Expand All @@ -33,7 +33,7 @@ internal sealed partial class ScanBlobScanLogHybridPollingStrategy : IBlobListen
private int _scanBlobLimitPerPoll = 10000;
private PollLogsStrategy _pollLogStrategy;
private bool _disposed;

public ScanBlobScanLogHybridPollingStrategy(IBlobScanInfoManager blobScanInfoManager, IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger) : base()
{
_blobScanInfoManager = blobScanInfoManager;
Expand Down Expand Up @@ -221,12 +221,13 @@ public async Task<IEnumerable<ICloudBlob>> PollNewBlobsAsync(
try
{
OperationContext operationContext = new OperationContext { ClientRequestID = clientRequestId };
blobSegment = await TimeoutHandler.ExecuteWithTimeout("ScanContainer", operationContext.ClientRequestID, _exceptionHandler, () =>
{
return container.ListBlobsSegmentedAsync(prefix: null, useFlatBlobListing: true,
blobListingDetails: BlobListingDetails.None, maxResults: blobPollLimitPerContainer, currentToken: continuationToken,
options: null, operationContext: operationContext, cancellationToken: cancellationToken);
});
blobSegment = await TimeoutHandler.ExecuteWithTimeout("ScanContainer", operationContext.ClientRequestID, _exceptionHandler,
_logger, cancellationToken, () =>
{
return container.ListBlobsSegmentedAsync(prefix: null, useFlatBlobListing: true,
blobListingDetails: BlobListingDetails.None, maxResults: blobPollLimitPerContainer, currentToken: continuationToken,
options: null, operationContext: operationContext, cancellationToken: cancellationToken);
});

if (blobSegment == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using Microsoft.Azure.WebJobs.Extensions.Storage;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;

namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
Expand Down Expand Up @@ -196,14 +196,15 @@ public async Task<TaskSeriesCommandResult> ExecuteAsync(CancellationToken cancel
sw = Stopwatch.StartNew();
OperationContext context = new OperationContext { ClientRequestID = clientRequestId };

batch = await TimeoutHandler.ExecuteWithTimeout("GetMessages", context.ClientRequestID, _exceptionHandler, () =>
{
return _queue.GetMessagesAsync(_queueProcessor.BatchSize,
_visibilityTimeout,
options: null,
operationContext: context,
cancellationToken: cancellationToken);
});
batch = await TimeoutHandler.ExecuteWithTimeout(nameof(CloudQueue.GetMessageAsync), context.ClientRequestID,
_exceptionHandler, _logger, cancellationToken, () =>
{
return _queue.GetMessagesAsync(_queueProcessor.BatchSize,
_visibilityTimeout,
options: null,
operationContext: context,
cancellationToken: cancellationToken);
});

int count = batch?.Count() ?? -1;
Logger.GetMessages(_logger, _functionDescriptor.LogName, _queue.Name, context.ClientRequestID, count, sw.ElapsedMilliseconds);
Expand Down
35 changes: 32 additions & 3 deletions src/Microsoft.Azure.WebJobs.Extensions.Storage/TimeoutHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Storage
{
internal static class TimeoutHandler
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(2);
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(3);

public static async Task<T> ExecuteWithTimeout<T>(string operationName, string clientRequestId, IWebJobsExceptionHandler exceptionHandler, Func<Task<T>> operation)
public static async Task<T> ExecuteWithTimeout<T>(string operationName, string clientRequestId,
IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken, Func<Task<T>> operation)
{
using (var cts = new CancellationTokenSource())
{
Stopwatch sw = Stopwatch.StartNew();

Task timeoutTask = Task.Delay(DefaultTimeout, cts.Token);
Task<T> operationTask = operation();

Expand All @@ -42,7 +48,30 @@ public static async Task<T> ExecuteWithTimeout<T>(string operationName, string c
// Cancel the Delay.
cts.Cancel();

return await operationTask;
try
{
// Determine if this was a deadlock. If so, log it and rethrow. Use the passed-in cancellationToken
// to detemrine of this operation was canceled explicitly or was due to an internal network timeout.

return await operationTask;
}
catch (StorageException ex) when (ex.InnerException is OperationCanceledException && !cancellationToken.IsCancellationRequested) // network timeout
{
Logger.StorageTimeout(logger, operationName, clientRequestId, sw.ElapsedMilliseconds);
throw;
}
}
}

private static class Logger
{
private static readonly Action<ILogger, string, string, long, Exception> _storageDeadlock =
LoggerMessage.Define<string, string, long>(Microsoft.Extensions.Logging.LogLevel.Debug, new EventId(600, nameof(StorageTimeout)),
"The operation '{operationName}' with id '{clientRequestId}' was canceled due to a network timeout after '{elapsed}' ms.");

internal static void StorageTimeout(ILogger logger, string operationName, string clientRequestId, long elapsedMilliseconds)
{
_storageDeadlock(logger, operationName, clientRequestId, elapsedMilliseconds, null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.5" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.7" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.4" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.4" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.14" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.4" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.WebJobs.Host/Timers/TaskSeriesTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ private async Task RunAsync(CancellationToken cancellationToken)
TaskSeriesCommandResult result = await _command.ExecuteAsync(cancellationToken);
wait = result.Wait;
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
catch (Exception ex) when (ex.InnerException is OperationCanceledException)
{
// TaskCanceledExceptions coming from storage are wrapped in a StorageException.
// OperationCanceledExceptions coming from storage are wrapped in a StorageException.
// We'll handle them all here so they don't have to be managed for every call.
}
catch (OperationCanceledException)
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Azure.WebJobs.Logging/WebJobs.Logging.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.5" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.7" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
6 changes: 3 additions & 3 deletions test/FakeStorage/FakeAzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.5" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.7" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.4" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.4" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.0" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.4" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.1.4" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Blobs.Listeners;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Blobs.Listeners;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using Xunit;


namespace Microsoft.Azure.WebJobs.Host.UnitTests.Blobs.Listeners
{
public class StorageBlobClientExtensionsTests
Expand Down Expand Up @@ -61,9 +61,9 @@ public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount)
// Mock the List function to return the correct blob page
HashSet<BlobContinuationToken> seenTokens = new HashSet<BlobContinuationToken>();
BlobResultSegment resultSegment = null;
mockClient.Setup(p => p.ListBlobsSegmentedAsync("test", true, BlobListingDetails.None, null, It.IsAny<BlobContinuationToken>(), null, It.IsAny<OperationContext>()))
.Callback<string, bool, BlobListingDetails, int?, BlobContinuationToken, BlobRequestOptions, OperationContext>(
(prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext) =>
mockClient.Setup(p => p.ListBlobsSegmentedAsync("test", true, BlobListingDetails.None, null, It.IsAny<BlobContinuationToken>(), null, It.IsAny<OperationContext>(), It.IsAny<CancellationToken>()))
.Callback<string, bool, BlobListingDetails, int?, BlobContinuationToken, BlobRequestOptions, OperationContext, CancellationToken>(
(prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext, cancellationToken) =>
{
// Previously this is where a bug existed - ListBlobsAsync wasn't handling
// continuation tokens properly and kept sending the same initial token
Expand All @@ -82,7 +82,7 @@ public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount)
})
.Returns(() => { return Task.FromResult(resultSegment); });

IEnumerable<IListBlobItem> results = await mockClient.Object.ListBlobsAsync("test", true, BlobListingDetails.None, "test", new TestExceptionHandler(), CancellationToken.None);
IEnumerable<IListBlobItem> results = await mockClient.Object.ListBlobsAsync("test", true, BlobListingDetails.None, "test", new TestExceptionHandler(), NullLogger<BlobListener>.Instance, CancellationToken.None);

Assert.Equal(blobCount, results.Count());
}
Expand Down
Loading

0 comments on commit c9d92b2

Please sign in to comment.