Wednesday, September 13, 2023

.NET In-Process Azure Durable Functions: Preserve Stack Order When Passing Between Orchestrators, Activities etc

In this post let's see how we can preserve Stack<T> order when it's getting passed between Orchestrators/Activities in a .NET In-Process Azure Durable Functions.

As you already know, when using Orchestrators/Activities in an Azure Durable Function,  different types of data are being serialized and persisted. Durable Functions for .NET In-Process internally uses Json.NET to serialize orchestration and entity data to JSON.

And when you pass in a Stack<T> to an Orchestrator or an Activity for an example, the order of items will not be preserved when it's serialized. It's actually an issue with Json.NET.

We can reproduce the issue simply by creating a simple .NET In-Process Azure Durable Function, something like below.

public static class Function1
{
    [FunctionName("HttpStart")]
    public static async Task<HttpResponseMessage> HttpStart(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get""post")] HttpRequestMessage req,
        [DurableClient] IDurableOrchestrationClient starter,
        ILogger log)
    {
        string instanceId = await starter.StartNewAsync("Function1");

        log.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

        return await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(req, instanceId, TimeSpan.FromSeconds(30));
    }

    [FunctionName("Function1")]
    public static async Task<Stack<string>> RunOrchestrator1([OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        Stack<stringtransitions = new();

        transitions.Push("T0");
        transitions.Push("T1");
        transitions.Push("T2");

        return await context.CallSubOrchestratorAsync<Stack<string>>("Function2", transitions);
    }

    [FunctionName("Function2")]
    public static async Task<Stack<string>> RunOrchestrator2([OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        Stack<stringtransitions = context.GetInput<Stack<string>>();

        transitions.Push("T3");
        transitions.Push("T4");
        transitions.Push("T5");

        return await Task.FromResult(transitions);
    }
}

The expected output should be as follows.

[
    "T5",
    "T4",
    "T3",
    "T2",
    "T1",
    "T0"
]
But it comes as follows.
Incorrect Result
The order can be so messed up when you have complex logic like when having recursive function calls.

This can be addressed by applying a custom serialization to the Durable Function.

First, we need to create a custom JsonConverter to serialize/deserialize Stack<T> correctly preserving the order.

/// <summary>
/// Converter for any Stack<T> that prevents Json.NET from reversing its order when deserializing.
/// https://github.com/JamesNK/Newtonsoft.Json/issues/971
/// https://stackoverflow.com/a/39481981/4865541
/// </summary>
public class StackConverter : JsonConverter
{
    public override bool CanConvert(Type objectType)
    {
        return StackParameterType(objectType) != null;
    }

    public override object ReadJson(JsonReader reader, Type objectTypeobject existingValue, JsonSerializer serializer)
    {
        if (reader.TokenType == JsonToken.Null)
        {
            return null;
        }

        try
        {
            Type parameterType = StackParameterType(objectType);
            MethodInfo method = GetType().GetMethod(nameof(ReadJsonGeneric), BindingFlags.NonPublic | BindingFlags.Static);

            MethodInfo genericMethod = method.MakeGenericMethod(new[] { parameterType });
            return genericMethod.Invoke(thisnew object[] { reader, objectType, existingValue, serializer });
        }
        catch (TargetInvocationException ex)
        {
            // Wrap the TargetInvocationException in a JsonSerializerException
            throw new JsonSerializationException("Failed to deserialize " + objectType, ex);
        }
    }

    public override bool CanWrite { get { return false; } }

    public override void WriteJson(JsonWriter writerobject value, JsonSerializer serializer)
    {
        throw new NotImplementedException();
    }

    private static Type StackParameterType(Type objectType)
    {
        while (objectType != null)
        {
            if (objectType.IsGenericType)
            {
                Type genericType = objectType.GetGenericTypeDefinition();
                if (genericType == typeof(Stack<>))
                {
                    return objectType.GetGenericArguments()[0];
                }
            }
            objectType = objectType.BaseType;
        }

        return null;
    }

    private static object ReadJsonGeneric<T>(JsonReader reader, Type objectTypeobject existingValue, JsonSerializer serializer)
    {
        if (reader.TokenType == JsonToken.Null)
        {
            return null;
        }

        List<T> list = serializer.Deserialize<List<T>>(reader);
        Stack<T> stack = existingValue as Stack<T> ?? (Stack<T>)serializer.ContractResolver.ResolveContract(objectType).DefaultCreator();
        for (int i = list.Count - 1; i >= 0; i--)
        {
            stack.Push(list[i]);
        }

        return stack;
    }
}
Then we can override the default Json.NET serialization settings of the Durable Function using custom implementations of the IMessageSerializerSettingsFactory as follows.
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Reflection;

[assembly: FunctionsStartup(typeof(DurableFunctions.StackSerialization.Startup))]
namespace DurableFunctions.StackSerialization;

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddSingleton<IMessageSerializerSettingsFactory, CustomMessageSerializerSettingsFactory>(); // ...
    }

    /// <summary>
    /// A factory that provides the serialization for all inputs and outputs for activities and
    /// orchestrations, as well as entity state.
    /// </summary>
    internal class CustomMessageSerializerSettingsFactory : IMessageSerializerSettingsFactory
    {
        public JsonSerializerSettings CreateJsonSerializerSettings()
        {
            return new JsonSerializerSettings
            {
                TypeNameHandling = TypeNameHandling.None,
                DateParseHandling = DateParseHandling.None,
                Converters = new List<JsonConverter>
                {
                    new StackConverter()
                }
            };
        }
    }
}
And now if we invoke the function, we can see the correct output.
Correct Result
Sample code:


Hope this helps.

Happy Coding.

Regards,
Jaliya

No comments:

Post a Comment