In this post, let's see how we can make use of Azure Durable Functions with its External Event listening feature. This post assumes you have a basic understanding Durable Functions.
Let's consider the following scenario. Say, I want to generate a document in an external system. But to do that, the process is,
- Call an external endpoint first to create a Folder to generate our document in. We will have to either keep polling for status or the external system will notify us when it's completed, so we can know the path of the folder created.
- Then call an external endpoint to create the Document passing in the above-created Folder path. So the document will be generated inside. Same as in the above step, we will have to either keep polling for status or the the external system will notify us when it's completed, so we can get a handle of the document generated.
Now let's see how we can implement this using Azure Durable Functions and
External Events.
First I have below StarterFunction, which is basically the entry point. It's a HttpTrigger function, I can just do a GET on its endpoint to trigger. And I
have the name of the document I want to create hard coded.
public static class StarterFunction{[FunctionName("HttpStart")]public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,[DurableClient] IDurableOrchestrationClient starter,ILogger log){var documentName = "Terms and Conditions";// Starting the Orchestrator, documentName is the inputstring instanceId = await starter.StartNewAsync("WorkflowOrchestrator", input: documentName);log.LogInformation($"Started orchestration with Instance ID = '{instanceId}'.");// Returns a status response, so the client can,// Poll for the status of the instance// Send events to the instance// Terminate instance if not required// Purge history of orchestration instancereturn starter.CreateCheckStatusResponse(req, instanceId);}}
Here I am kicking off the Durable Function named
WorkflowOrchestrator passing in the name of the document. And from here
I am returning a status response that includes many details (details are commented).
Then I have the WorkflowOrchestrator function.
public static class WorkflowOrchestratorFunction
{
[FunctionName("WorkflowOrchestrator")]
public static async Task<DocumentCreateRequest> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var request = new DocumentCreateRequest
{
InstanceId = context.InstanceId,
DocumentName = context.GetInput<string>()
};
await context.CallActivityAsync("CreateFolder", request);
request = await context.WaitForExternalEvent<DocumentCreateRequest>("FolderCreated");
await context.CallActivityAsync("CreateDocument", request);
request = await context.WaitForExternalEvent<DocumentCreateRequest>("DocumentCreated");
return request;
}
}
Here first I am getting the InstanceId and the passed in DocumentName. Then I
am calling an activity function named CreateFolder which is calling the external endpoint to create the folder. And then waiting for the event named FolderCreated to be triggered. Now the Orchestrator function goes to sleep.
And here is CreateFolder function.
public static class CreateFolderFunction
{
[FunctionName("CreateFolder")]
public static async Task Run([ActivityTrigger] DocumentCreateRequest request,
[ServiceBus("%ServiceBusQueueName%", Connection = "ServiceBusConnection", EntityType = EntityType.Queue)] IAsyncCollector<DocumentCreateRequest> requestCollector)
{
// Call external API to Create Folder
// And then return immediately
// Demo External Event
// Pushes a message into a queue with the created folder path and expected event
await Task.Delay(2000);
request.FolderPath = "/SomeFolder";
request.EventName = "FolderCreated";
await requestCollector.AddAsync(request);
}
}
So here ideally, we would call the external endpoint and return. But to demo the external event triggering, I just added a Task.Delay() and adding a message to a queue with the created FolderPath and the Event to trigger.
And then I have the below EventTrigger function, which will get triggered when we receive a message to the queue. Basically, we can think the external system has pushed this message. Now I have the FolderPath and we know the event to trigger.
public static class EventTriggeredFunction
{
[FunctionName("EventTriggered")]
public static async Task Run(
[ServiceBusTrigger("%ServiceBusQueueName%", Connection = "ServiceBusConnection")] DocumentCreateRequest request,
[DurableClient] IDurableOrchestrationClient client)
{
// Raising the named event in the instance passing the queue message
await client.RaiseEventAsync(request.InstanceId, request.EventName, request);
}
}
So once the event is Raised, the orchestrator function wakes up, replays up to where it stopped before, and then resumes.
So next it calls CreateDocument function and waits for the external event
named DocumentCreated to be triggered.
await context.CallActivityAsync("CreateDocument", request);
request = await context.WaitForExternalEvent<DocumentCreateRequest>("DocumentCreated");
So this is the CreateDocument function.
public static class CreateDocumentFunction
{
[FunctionName("CreateDocument")]
public static async Task Run([ActivityTrigger] DocumentCreateRequest request,
[ServiceBus("%ServiceBusQueueName%", Connection = "ServiceBusConnection", EntityType = EntityType.Queue)] IAsyncCollector<DocumentCreateRequest> requestCollector)
{
// Call external API to Create Folder
// And the return immediately
// Demo External Event
// Pushes a message into a queue with the document path and expected event
await Task.Delay(3000);
request.DocumentPath = $"{request.FolderPath}/{request.DocumentName}.pdf";
request.EventName = "DocumentCreated";
await requestCollector.AddAsync(request);
}
}
It's the same flow as before. Here also just for the demo, I have a Task.Delay() and I am adding a
message to our queue. And the previous EventTriggered function will get called triggered again raising DocumentCreated event this time.
So when I run this whole thing, this is how it looks like.
Demo |
When I call the HttpTrigger, it will kick off the orchestrator and return a
response with Uris to different options.
Response |
And here we have a status check Uri, which we can use to check the progress. It's
running.
Status: Running |
And then the orchestrator function is completed.
Status: Completed |
And if you want to handle timeouts, we can easily introduce a CancellationTokenSource and wait for at least one task to be completed and check which one got completed and the necessary logic.
Func<Task<DocumentCreateRequest>> func = async () =>
{
await context.CallActivityAsync("CreateFolder", request);
request = await context.WaitForExternalEvent<DocumentCreateRequest>("FolderCreated");
await context.CallActivityAsync("CreateDocument", request);
request = await context.WaitForExternalEvent<DocumentCreateRequest>("DocumentCreated");
return request;
};
using (var cts = new CancellationTokenSource())
{
DateTime expiration = context.CurrentUtcDateTime.AddSeconds(2);
Task timeoutTask = context.CreateTimer(expiration, cts.Token);
Task<DocumentCreateRequest> createDocumentTask = func();
Task winner = await Task.WhenAny(timeoutTask, createDocumentTask);
if (winner == createDocumentTask)
{
request = createDocumentTask.Result;
}
else
{
request.Message = "Timeout occured";
}
}
So here, I just added a timeout task of 2 seconds and that got triggered first.
Timeout Occurred |
Hope you got a good overall understanding of how Azure Durable Functions can wait for external events. Once you have the basics, there is a whole lot you can do.
Happy Coding.
Regards,
Jaliya
No comments:
Post a Comment