Skip to content

Commit

Permalink
Implement CreateHttpManagementPayload API in Durable Worker Extension (
Browse files Browse the repository at this point in the history
…#2929)

* initial commit

* add comment

* add test

* update by comment

* add httpmanagementpayload class

* re-arrange if section to make code more readable

* remove unnecessary exception catch

* add nullable check at HttpManagementPayload

* add nullable check

* Add comment as suggested

* Update FunctionsDurableTaskClientTests.cs

* update a typo as I found this at my e2e test
  • Loading branch information
nytian authored Oct 23, 2024
1 parent 4cc6ec2 commit a7c6d69
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 22 deletions.
9 changes: 9 additions & 0 deletions src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public string DurableOrchestrationClientToString(IDurableOrchestrationClient cli
ConnectionName = attr.ConnectionName,
RpcBaseUrl = localRpcAddress,
RequiredQueryStringParameters = this.config.HttpApiHandler.GetUniversalQueryStrings(),
HttpBaseUrl = this.config.HttpApiHandler.GetBaseUrl(),
});
}

Expand Down Expand Up @@ -130,6 +131,14 @@ private class OrchestrationClientInputData
/// </summary>
[JsonProperty("rpcBaseUrl")]
public string? RpcBaseUrl { get; set; }

/// <summary>
/// The base URL of the Azure Functions host, used in the out-of-proc model.
/// This URL is sent by the client binding object to the Durable Worker extension,
/// allowing the extension to know the host's base URL for constructing management URLs.
/// </summary>
[JsonProperty("httpBaseUrl")]
public string? HttpBaseUrl { get; set; }
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
}

DurableTaskClient client = this.clientProvider.GetClient(endpoint, inputData?.taskHubName, inputData?.connectionName);
client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters);
client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters, inputData!.httpBaseUrl);
return new ValueTask<ConversionResult>(ConversionResult.Success(client));
}
catch (Exception innerException)
Expand All @@ -62,5 +62,5 @@ public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
}

// Serializer is case-sensitive and incoming JSON properties are camel-cased.
private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters);
private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters, string httpBaseUrl);
}
79 changes: 65 additions & 14 deletions src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,30 @@ public static HttpResponseData CreateCheckStatusResponse(
return response;
}

private static object SetHeadersAndGetPayload(
DurableTaskClient client, HttpRequestData request, HttpResponseData response, string instanceId)
/// <summary>
/// Creates an HTTP management payload for the specified orchestration instance.
/// </summary>
/// <param name="client">The <see cref="DurableTaskClient"/>.</param>
/// <param name="instanceId">The ID of the orchestration instance.</param>
/// <param name="request">Optional HTTP request data to use for creating the base URL.</param>
/// <returns>An object containing instance control URLs.</returns>
/// <exception cref="ArgumentException">Thrown when instanceId is null or empty.</exception>
/// <exception cref="InvalidOperationException">Thrown when a valid base URL cannot be determined.</exception>
public static HttpManagementPayload CreateHttpManagementPayload(
this DurableTaskClient client,
string instanceId,
HttpRequestData? request = null)
{
if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentException("InstanceId cannot be null or empty.", nameof(instanceId));
}

return SetHeadersAndGetPayload(client, request, null, instanceId);
}

private static HttpManagementPayload SetHeadersAndGetPayload(
DurableTaskClient client, HttpRequestData? request, HttpResponseData? response, string instanceId)
{
static string BuildUrl(string url, params string?[] queryValues)
{
Expand All @@ -143,22 +165,46 @@ static string BuildUrl(string url, params string?[] queryValues)
// request headers into consideration and generate the base URL accordingly.
// More info: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded.
// One potential workaround is to set ASPNETCORE_FORWARDEDHEADERS_ENABLED to true.
string baseUrl = request.Url.GetLeftPart(UriPartial.Authority);

// If HttpRequestData is provided, use its URL; otherwise, get the baseUrl from the DurableTaskClient.
// The base URL could be null if:
// 1. The DurableTaskClient isn't a FunctionsDurableTaskClient (which would have the baseUrl from bindings)
// 2. There's no valid HttpRequestData provided
string? baseUrl = ((request != null) ? request.Url.GetLeftPart(UriPartial.Authority) : GetBaseUrl(client));

if (baseUrl == null)
{
throw new InvalidOperationException("Failed to create HTTP management payload as base URL is null. Either use Functions bindings or provide an HTTP request to create the HttpPayload.");
}

bool isFromRequest = request != null;

string formattedInstanceId = Uri.EscapeDataString(instanceId);
string instanceUrl = $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}";

// The baseUrl differs depending on the source. Eg:
// - From request: http://localhost:7071/
// - From durable client: http://localhost:7071/runtime/webhooks/durabletask
// We adjust the instanceUrl construction accordingly.
string instanceUrl = isFromRequest
? $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}"
: $"{baseUrl}/instances/{formattedInstanceId}";
string? commonQueryParameters = GetQueryParams(client);
response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters));
response.Headers.Add("Content-Type", "application/json");

if (response != null)
{
response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters));
response.Headers.Add("Content-Type", "application/json");
}

return new
return new HttpManagementPayload
{
id = instanceId,
purgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
sendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
statusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
terminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters),
suspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters),
resumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters)
Id = instanceId,
PurgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters),
SendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters),
StatusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters),
TerminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters),
SuspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters),
ResumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters)
};
}

Expand All @@ -172,4 +218,9 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response)
{
return client is FunctionsDurableTaskClient functions ? functions.QueryString : null;
}

private static string? GetBaseUrl(DurableTaskClient client)
{
return client is FunctionsDurableTaskClient functions ? functions.HttpBaseUrl : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ internal sealed class FunctionsDurableTaskClient : DurableTaskClient
{
private readonly DurableTaskClient inner;

public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString)
public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString, string? httpBaseUrl)
: base(inner.Name)
{
this.inner = inner;
this.QueryString = queryString;
this.HttpBaseUrl = httpBaseUrl;
}

public string? QueryString { get; }

public string? HttpBaseUrl { get; }
public override DurableEntityClient Entities => this.inner.Entities;

public override ValueTask DisposeAsync()
Expand Down
97 changes: 97 additions & 0 deletions src/Worker.Extensions.DurableTask/HttpManagementPayload.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// This is a copy of: https://github.com/Azure/azure-functions-durable-extension/blob/dev/src/WebJobs.Extensions.DurableTask/HttpManagementPayload.cs

using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;

namespace Microsoft.Azure.Functions.Worker;

/// <summary>
/// Data structure containing status, terminate and send external event HTTP endpoints.
/// </summary>
public class HttpManagementPayload
{
/// <summary>
/// Gets the ID of the orchestration instance.
/// </summary>
/// <value>
/// The ID of the orchestration instance.
/// </value>
[JsonProperty("id")]
public string? Id { get; internal set; }

/// <summary>
/// Gets the HTTP GET status query endpoint URL.
/// </summary>
/// <value>
/// The HTTP URL for fetching the instance status.
/// </value>
[JsonProperty("statusQueryGetUri")]
public string? StatusQueryGetUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST external event sending endpoint URL.
/// </summary>
/// <value>
/// The HTTP URL for posting external event notifications.
/// </value>
[JsonProperty("sendEventPostUri")]
public string? SendEventPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance termination endpoint.
/// </summary>
/// <value>
/// The HTTP URL for posting instance termination commands.
/// </value>
[JsonProperty("terminatePostUri")]
public string? TerminatePostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance rewind endpoint.
/// </summary>
/// <value>
/// The HTTP URL for rewinding orchestration instances.
/// </value>
[JsonProperty("rewindPostUri")]
public string? RewindPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP DELETE purge instance history by instance ID endpoint.
/// </summary>
/// <value>
/// The HTTP URL for purging instance history by instance ID.
/// </value>
[JsonProperty("purgeHistoryDeleteUri")]
public string? PurgeHistoryDeleteUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance restart endpoint.
/// </summary>
/// <value>
/// The HTTP URL for restarting an orchestration instance.
/// </value>
[JsonProperty("restartPostUri")]
public string? RestartPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance suspend endpoint.
/// </summary>
/// <value>
/// The HTTP URL for suspending an orchestration instance.
/// </value>
[JsonProperty("suspendPostUri")]
public string? SuspendPostUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance resume endpoint.
/// </summary>
/// <value>
/// The HTTP URL for resuming an orchestration instance.
/// </value>
[JsonProperty("resumePostUri")]
public string? ResumePostUri { get; internal set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Grpc;
using Moq;

namespace Microsoft.Azure.Functions.Worker.Tests
Expand All @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests
/// </summary>
public class FunctionsDurableTaskClientTests
{
private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient()
private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? baseUrl = null)
{
// construct mock client

Expand All @@ -22,7 +22,7 @@ private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient()
It.IsAny<string>(), It.IsAny<TerminateInstanceOptions>(), It.IsAny<CancellationToken>())).Returns(completedTask);

DurableTaskClient durableClient = durableClientMock.Object;
FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null);
FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null, httpBaseUrl: baseUrl);
return client;
}

Expand Down Expand Up @@ -53,5 +53,51 @@ public async void TerminateDoesNotThrow()
await client.TerminateInstanceAsync(instanceId, options);
await client.TerminateInstanceAsync(instanceId, options, token);
}

/// <summary>
/// Test that the `CreateHttpManagementPayload` method returns the expected payload structure without HttpRequestData.
/// </summary>
[Fact]
public void CreateHttpManagementPayload_WithBaseUrl()
{
const string BaseUrl = "http://localhost:7071/runtime/webhooks/durabletask";
FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient(BaseUrl);
string instanceId = "testInstanceIdWithHostBaseUrl";

HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId);

AssertHttpManagementPayload(payload, BaseUrl, instanceId);
}

/// <summary>
/// Test that the `CreateHttpManagementPayload` method returns the expected payload structure with HttpRequestData.
/// </summary>
[Fact]
public void CreateHttpManagementPayload_WithHttpRequestData()
{
const string requestUrl = "http://localhost:7075/orchestrators/E1_HelloSequence";
FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient();
string instanceId = "testInstanceIdWithRequest";

// Create mock HttpRequestData object.
var mockFunctionContext = new Mock<FunctionContext>();
var mockHttpRequestData = new Mock<HttpRequestData>(mockFunctionContext.Object);
mockHttpRequestData.SetupGet(r => r.Url).Returns(new Uri(requestUrl));

HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId, mockHttpRequestData.Object);

AssertHttpManagementPayload(payload, "http://localhost:7075/runtime/webhooks/durabletask", instanceId);
}

private static void AssertHttpManagementPayload(HttpManagementPayload payload, string BaseUrl, string instanceId)
{
Assert.Equal(instanceId, payload.Id);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.PurgeHistoryDeleteUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/raiseEvent/{{eventName}}", payload.SendEventPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.StatusQueryGetUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/terminate?reason={{{{text}}}}", payload.TerminatePostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/suspend?reason={{{{text}}}}", payload.SuspendPostUri);
Assert.Equal($"{BaseUrl}/instances/{instanceId}/resume?reason={{{{text}}}}", payload.ResumePostUri);
}
}
}
}

0 comments on commit a7c6d69

Please sign in to comment.