Thursday, April 29, 2021

Azure Service Bus Client Library for .NET: Receivers vs. Processors

Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are ReceiversServiceBusReceiver and ProcessorsServiceBusProcessor (when consuming messages from Sessions enabled entity ServiceBusSessionReceiver and ServiceBusSessionProcessor respectively).

In this post, let's see what these are and when to use Receivers or Processors. Here I am going to be using ServiceBusSessionProcessor as I am reading from Session enabled Queue.

First, let's have a look at how Receivers are used. In one of my previous posts, I wrote about Azure Service Bus: Handling FIFO using Sessions (please take a read, you might find it interesting) where I was using the Receivers approach. For the sake of simplicity, I will just share the same code.
ServiceBusSessionReceiver
You can see there are two do-while loops and a lot of ceremonies going on. We can use ServiceBusSessionProcessor to simplify this code and it offers additional features like automatic completion of processed messages, automatic message lock renewal, and concurrent execution etc.

Now let's have a look at how we can use ServiceBusSessionProcessor.
private static async Task RunWithSessionProcessor(ServiceBusClient client)
{
    var options = new ServiceBusSessionProcessorOptions()
    {
        // interesting options here
    };

    ServiceBusSessionProcessor processor = client.CreateSessionProcessor(Shared.Configuration.QUEUE_NAME, options);

    processor.SessionInitializingAsync += async args =>
    {
        Console.WriteLine($"Initializing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}', SessionLockedUntil: '{args.SessionLockedUntil}'");
    };

    processor.SessionClosingAsync += async args =>
    {
        Console.WriteLine($"Closing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}'\n");
    };

    processor.ProcessMessageAsync += async args =>
    {
        Console.WriteLine($"Received for Session: '{args.SessionId}', Message: '{args.Message.Body}', Ack: Complete");
    };

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Exception for Session: '{args.Exception.Message}'");
    };

    Console.WriteLine("Starting...Press any character to gracefully exit.");

    await processor.StartProcessingAsync();

    Console.ReadKey();

    Console.WriteLine("Stopping...");

    await processor.StopProcessingAsync();

    Console.WriteLine("Stopped!");
}
First, we are creating a ServiceBusSessionProcessor for the targetted queue (it has an overload if you are using Subscriptions). You can see it's mostly event-based and we have different events such as when initializing a session, receiving a message, etc. We register the events and then we just have to start the processing. You must have noted that I have created ServiceBusSessionProcessorOptions to be passed into the CreateSessionProcessor method. We can use these options to control a variety of things, most importantly the following,
  • AutoCompleteMessages
    • When this is true, we don't explicitly need to Acknowledge the broker once the message is completed. Note: if there is an error in processing the message, the message won't be acknowledged as completed even though this is set to true. 
    • Default is true.
  • MaxAutoLockRenewalDuration
    • Maximum duration within which the session lock will be renewed automatically, the more is better. 
    • Default is 5 minutes.
  • MaxConcurrentCallsPerSession
    • Gets or sets the maximum number of concurrent calls to the message handler the processor should initiate per session. 
    • Default is 1.
  • MaxConcurrentSessions
    • Specifies how many sessions can be processed concurrently by the processor. 
    • Default is 8.
  • PrefetchCount
    • This specifies how many messages can be eagerly requested from the entity rather than doing a service request to get messages one by one. Note: this must be used with care, because the moment the broker released the message, that messages' Message Lock Duration starts.
    • Default is 0.
The general recommendation is Processors should be the go-to tool for writing applications that receive messages from Service Bus entities. The Receivers are recommended for more complex scenarios in which the processor is not able to provide the fine-grained control that one can expect when using the Receiver directly. Azure Functions use Processors, so obviously, that's my go-to option.

You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

No comments:

Post a Comment