Tuesday, April 16, 2024

.NET Isolated Azure Durable Functions: Wait for Any Event and Wait for All the Events

In this post, let's see how we can wait for any event and wait for all the events in .NET Isolated Azure Durable Functions.

Let's consider the below code.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Text.Json.Nodes;

namespace FunctionApp1;

public static class Function1
{
    [Function(nameof(HttpStart))]
    public static async Task<HttpResponseData> HttpStart(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
        [DurableClient] DurableTaskClient client,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger(nameof(HttpStart));

        string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(Orchestrator));

        return client.CreateCheckStatusResponse(req, instanceId);
    }

    [Function(nameof(Orchestrator))]
    public static async Task<Dictionary<string, JsonObject>> Orchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        ILogger logger = context.CreateReplaySafeLogger(nameof(Orchestrator));

        List<string> events = [
            "Event1",
            "Event2",
            "Event3"
        ];

        Dictionary<string, Task<JsonObject>> tasks = [];

        foreach (string eventName in events)
        {
            logger.LogInformation($"http://localhost:7137/runtime/webhooks/durabletask/instances/{context.InstanceId}/raiseEvent/{eventName}");

            Task<JsonObject> task = context.WaitForExternalEvent<JsonObject>(eventName);
            tasks.Add(eventName, task);
        }

        // TODO: Handle WhenAny

        // TODO: Handle WhenAll
    }
}

Here I have a list of WaitForExternalEvent tasks, that we are going to listen to. The first scenario is we are going to resume execution when any of the events occur. The second scenario is we are going to resume execution when all of the events have occurred.

WhenAny

We can use the Task Parallel Libraries' (TPL) Task.WhenAny for this scenario.

// Handling WhenAny
Task<JsonObject> result = await Task.WhenAny(tasks.Values);

KeyValuePair<string, Task<JsonObject>> winner = tasks.Single(x => x.Value == result);
return new Dictionary<string, JsonObject>
{
    { winner.Key, await result }
};

Say for example, we are triggering "Event2", at that time the execution will resume.

WhenAny

WhenAll

We can use the Task Parallel Libraries' (TPL) Task.WhenAll for this scenario.

// Handling WhenAll
JsonObject[] results = await Task.WhenAll(tasks.Values);

Dictionary<string, JsonObject> resultsMap = tasks
    .Zip(results, (x, y) => new { x.Key, y })
    .ToDictionary(x => x.Key, x => x.y);

return resultsMap;
In this case, the execution won't resume until all the events are triggered.
WhenAll
Hope this helps.

Happy Coding.

Regards,
Jaliya

No comments:

Post a Comment