diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs index 09b2497ca..25c129db3 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs +++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs @@ -4,5 +4,5 @@ using System.Runtime.CompilerServices; using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; -[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1")] +[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0")] [assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")] diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto index 969e94357..48c07ed51 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto +++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; // this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc"; @@ -19,6 +20,21 @@ service Settlement { // Defers a message rpc Defer (DeferRequest) returns (google.protobuf.Empty) {} + + // Renew message lock + rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {} + + // Get session state + rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {} + + // Set session state + rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {} + + // Release session + rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {} + + // Renew session lock + rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {} } // The complete message request containing the locktoken. @@ -44,4 +60,40 @@ message DeadletterRequest { message DeferRequest { string locktoken = 1; bytes propertiesToModify = 2; -} \ No newline at end of file +} + +// The renew message lock request containing the locktoken. +message RenewMessageLockRequest { + string locktoken = 1; +} + +// The get message request. +message GetSessionStateRequest { + string sessionId = 1; +} + +// The set message request. +message SetSessionStateRequest { + string sessionId = 1; + bytes sessionState = 2; +} + +// Get response containing the session state. +message GetSessionStateResponse { + bytes sessionState = 1; +} + +// Release session. +message ReleaseSessionRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockRequest { + string sessionId = 1; +} + +// Renew session lock. +message RenewSessionLockResponse { + google.protobuf.Timestamp lockedUntil = 1; +} diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs index a9daff0a5..ad8bd4e47 100644 --- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs +++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs @@ -1,3 +1,6 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + using System; using System.Collections; using System.Collections.Generic; @@ -148,6 +151,24 @@ public virtual async Task DeferMessageAsync( await _settlement.DeferAsync(request, cancellationToken: cancellationToken); } + /// + public virtual async Task RenewMessageLockAsync( + ServiceBusReceivedMessage message, + CancellationToken cancellationToken = default) + { + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var request = new RenewMessageLockRequest() + { + Locktoken = message.LockToken, + }; + + await _settlement.RenewMessageLockAsync(request, cancellationToken: cancellationToken); + } + internal static ByteString ConvertToByteString(IDictionary propertiesToModify) { var map = new AmqpMap(); diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActions.cs new file mode 100644 index 000000000..6f435a536 --- /dev/null +++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActions.cs @@ -0,0 +1,98 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Google.Protobuf; +using Microsoft.Azure.Functions.Worker.Converters; +using Microsoft.Azure.ServiceBus.Grpc; + +namespace Microsoft.Azure.Functions.Worker +{ + /// + /// Converter to bind to type parameters. + /// + [InputConverter(typeof(ServiceBusSessionMessageActionsConverter))] + public class ServiceBusSessionMessageActions + { + private readonly Settlement.SettlementClient _settlement; + private readonly string _sessionId; + + internal ServiceBusSessionMessageActions(Settlement.SettlementClient settlement, string sessionId, DateTimeOffset sessionLockedUntil) + { + _settlement = settlement ?? throw new ArgumentNullException(nameof(settlement)); + _sessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId)); + SessionLockedUntil = sessionLockedUntil; + } + + /// + /// Initializes a new instance of the class for mocking use in testing. + /// + /// + /// This constructor exists only to support mocking. When used, class state is not fully initialized, and + /// will not function correctly; virtual members are meant to be mocked. + /// + protected ServiceBusSessionMessageActions() + { + _settlement = null!; // not expected to be used during mocking. + _sessionId = null!; // not expected to be used during mocking. + } + + public virtual DateTimeOffset SessionLockedUntil { get; protected set; } + + /// + public virtual async Task GetSessionStateAsync( + CancellationToken cancellationToken = default) + { + var request = new GetSessionStateRequest() + { + SessionId = _sessionId, + }; + + GetSessionStateResponse result = await _settlement.GetSessionStateAsync(request, cancellationToken: cancellationToken); + BinaryData binaryData = new BinaryData(result.SessionState.Memory); + return binaryData; + } + + /// + public virtual async Task SetSessionStateAsync( + BinaryData sessionState, + CancellationToken cancellationToken = default) + { + var request = new SetSessionStateRequest() + { + SessionId = _sessionId, + SessionState = ByteString.CopyFrom(sessionState.ToMemory().Span), + }; + + await _settlement.SetSessionStateAsync(request, cancellationToken: cancellationToken); + } + + /// + public virtual async Task ReleaseSession( + CancellationToken cancellationToken = default) + { + var request = new ReleaseSessionRequest() + { + SessionId = _sessionId, + }; + + await _settlement.ReleaseSessionAsync(request, cancellationToken: cancellationToken); + } + + /// + public virtual async Task RenewSessionLockAsync( + CancellationToken cancellationToken = default) + { + var request = new RenewSessionLockRequest() + { + SessionId = _sessionId, + }; + + var result = await _settlement.RenewSessionLockAsync(request, cancellationToken: cancellationToken); + SessionLockedUntil = result.LockedUntil.ToDateTimeOffset(); + } + } +} diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActionsConverter.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActionsConverter.cs new file mode 100644 index 000000000..1a37c3c47 --- /dev/null +++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusSessionMessageActionsConverter.cs @@ -0,0 +1,62 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker.Converters; +using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; +using Microsoft.Azure.ServiceBus.Grpc; +using System.Text.Json; + +namespace Microsoft.Azure.Functions.Worker +{ + /// + /// Converter to bind to or type parameters. + /// + [SupportsDeferredBinding] + [SupportedTargetType(typeof(ServiceBusSessionMessageActions))] + [SupportedTargetType(typeof(ServiceBusSessionMessageActions[]))] + internal class ServiceBusSessionMessageActionsConverter : IInputConverter + { + private readonly Settlement.SettlementClient _settlement; + + public ServiceBusSessionMessageActionsConverter(Settlement.SettlementClient settlement) + { + _settlement = settlement; + } + + public ValueTask ConvertAsync(ConverterContext context) + { + try + { + var foundSessionId = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionId", out object? sessionId); + if (!foundSessionId) + { + throw new InvalidOperationException($"Expecting SessionId within binding data and value was not present. Sessions must be enabled when binding to {nameof(ServiceBusSessionMessageActions)}."); + } + + // Get the sessionLockedUntil property from the SessionActions binding data + var foundSessionActions = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionActions", out object? sessionActions); + if (!foundSessionActions) + { + throw new InvalidOperationException("Expecting SessionActions within binding data and value was not present."); + } + + JsonDocument jsonDocument = JsonDocument.Parse(sessionActions!.ToString()); + var foundSessionLockedUntil = jsonDocument.RootElement.TryGetProperty("SessionLockedUntil", out JsonElement sessionLockedUntil); + if (!foundSessionLockedUntil) + { + throw new InvalidOperationException("Expecting SessionLockedUntil within binding data of session actions and value was not present."); + } + + var sessionActionResult = new ServiceBusSessionMessageActions(_settlement, sessionId!.ToString(), sessionLockedUntil.GetDateTimeOffset()); + var result = ConversionResult.Success(sessionActionResult); + return new ValueTask(result); + } + catch (Exception exception) + { + return new ValueTask(ConversionResult.Failed(exception)); + } + } + } +} diff --git a/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs b/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs index 5bc7fcefc..86c6d19fb 100644 --- a/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs +++ b/test/FunctionMetadataGeneratorTests/FunctionMetadataGeneratorTests.cs @@ -1156,7 +1156,7 @@ public void ServiceBus_SDKTypeBindings() AssertDictionary(extensions, new Dictionary { - { "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1" }, + { "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0" }, }); var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction)); diff --git a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs index 04096f40a..0edf5612e 100644 --- a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs +++ b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs @@ -75,6 +75,23 @@ public async Task CanDeferMessage() await messageActions.DeferMessageAsync(message, properties); } + [Fact] + public async Task CanRenewMessageLock() + { + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid()); + var properties = new Dictionary() + { + { "int", 1 }, + { "string", "foo"}, + { "timespan", TimeSpan.FromSeconds(1) }, + { "datetime", DateTime.UtcNow }, + { "datetimeoffset", DateTimeOffset.UtcNow }, + { "guid", Guid.NewGuid() } + }; + var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties)); + await messageActions.RenewMessageLockAsync(message); + } + [Fact] public async Task PassingNullMessageThrows() { @@ -83,6 +100,7 @@ public async Task PassingNullMessageThrows() await Assert.ThrowsAsync(async () => await messageActions.AbandonMessageAsync(null)); await Assert.ThrowsAsync(async () => await messageActions.DeadLetterMessageAsync(null)); await Assert.ThrowsAsync(async () => await messageActions.DeferMessageAsync(null)); + await Assert.ThrowsAsync(async () => await messageActions.RenewMessageLockAsync(null)); } private class MockSettlementClient : Settlement.SettlementClient @@ -128,6 +146,12 @@ public override AsyncUnaryCall DeferAsync(DeferRequest request, Metadata Assert.Equal(_propertiesToModify, request.PropertiesToModify); return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); } + + public override AsyncUnaryCall RenewMessageLockAsync(RenewMessageLockRequest request, CallOptions options) + { + Assert.Equal(_lockToken, request.Locktoken); + return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + } } } -} \ No newline at end of file +} diff --git a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusSessionMessageActions.cs b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusSessionMessageActions.cs new file mode 100644 index 000000000..115f11ed0 --- /dev/null +++ b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusSessionMessageActions.cs @@ -0,0 +1,90 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Azure.ServiceBus.Grpc; +using Xunit; + +namespace Microsoft.Azure.Functions.Worker.Extensions.Tests +{ + public class ServiceBusSessionMessageActionsTests + { + [Fact] + public async Task CanGetSessionState() + { + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid(), sessionId: "test"); + var messageActions = new ServiceBusSessionMessageActions(new MockSettlementClient(message.SessionId), message.SessionId, message.LockedUntil); + await messageActions.GetSessionStateAsync(); + } + + [Fact] + public async Task CanSetSessionState() + { + byte[] predefinedData = { 0x48, 0x65 }; + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid(), sessionId: "test"); + var messageActions = new ServiceBusSessionMessageActions(new MockSettlementClient(message.SessionId, ByteString.CopyFrom(predefinedData)), message.SessionId, message.LockedUntil); + await messageActions.SetSessionStateAsync(BinaryData.FromBytes(predefinedData)); + } + + [Fact] + public async Task CanReleaseSession() + { + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid(), sessionId: "test"); + var messageActions = new ServiceBusSessionMessageActions(new MockSettlementClient(message.SessionId), message.SessionId, message.LockedUntil); + await messageActions.ReleaseSession(); + } + + [Fact] + public async Task CanRenewSessionLock() + { + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid(), sessionId: "test"); + var messageActions = new ServiceBusSessionMessageActions(new MockSettlementClient(message.SessionId), message.SessionId, message.LockedUntil); + await messageActions.RenewSessionLockAsync(); + } + + private class MockSettlementClient : Settlement.SettlementClient + { + private readonly string _sessionId; + private readonly ByteString _sessionState; + public MockSettlementClient(string sessionId, ByteString? sessionState = null) : base() + { + _sessionId = sessionId; + _sessionState = sessionState; + } + + public override AsyncUnaryCall GetSessionStateAsync(GetSessionStateRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default) + { + Assert.Equal(_sessionId, request.SessionId); + return new AsyncUnaryCall(Task.FromResult(new GetSessionStateResponse()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + } + + public override AsyncUnaryCall SetSessionStateAsync(SetSessionStateRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default) + { + Assert.Equal(_sessionId, request.SessionId); + Assert.Equal(_sessionState, request.SessionState); + return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + } + + public override AsyncUnaryCall ReleaseSessionAsync(ReleaseSessionRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default) + { + Assert.Equal(_sessionId, request.SessionId); + return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + } + + public override AsyncUnaryCall RenewSessionLockAsync(RenewSessionLockRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default) + { + Assert.Equal(_sessionId, request.SessionId); + var response = new RenewSessionLockResponse(); + response.LockedUntil = Timestamp.FromDateTime(DateTime.UtcNow.AddSeconds(30)); + return new AsyncUnaryCall(Task.FromResult(response), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + } + } + } +}