Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are Receivers: ServiceBusReceiver and Processors: ServiceBusProcessor (when consuming messages from Sessions enabled entity ServiceBusSessionReceiver and ServiceBusSessionProcessor respectively).
In this post, let's see what these are and when to use Receivers or Processors. Here I am going to be using ServiceBusSessionProcessor as I am reading from Session enabled Queue.
First, let's have a look at how Receivers are used. In one of my previous posts,
I wrote about
Azure Service Bus: Handling FIFO using Sessions (please take a read, you might find it interesting) where I was using the Receivers approach. For the sake of simplicity, I will just share the same code.
ServiceBusSessionReceiver |
Now let's have a look at how we can use ServiceBusSessionProcessor.
private static async Task RunWithSessionProcessor(ServiceBusClient client)
{
var options = new ServiceBusSessionProcessorOptions()
{
// interesting options here
};
ServiceBusSessionProcessor processor = client.CreateSessionProcessor(Shared.Configuration.QUEUE_NAME, options);
processor.SessionInitializingAsync += async args =>
{
Console.WriteLine($"Initializing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}', SessionLockedUntil: '{args.SessionLockedUntil}'");
};
processor.SessionClosingAsync += async args =>
{
Console.WriteLine($"Closing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}'\n");
};
processor.ProcessMessageAsync += async args =>
{
Console.WriteLine($"Received for Session: '{args.SessionId}', Message: '{args.Message.Body}', Ack: Complete");
};
processor.ProcessErrorAsync += async args =>
{
Console.WriteLine($"Exception for Session: '{args.Exception.Message}'");
};
Console.WriteLine("Starting...Press any character to gracefully exit.");
await processor.StartProcessingAsync();
Console.ReadKey();
Console.WriteLine("Stopping...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped!");
}
First, we are creating a ServiceBusSessionProcessor for the targetted queue (it has an overload if you are using
Subscriptions). You can see it's mostly event-based and we have different events such as when initializing a session, receiving a message, etc. We register the events and then we just have to start the processing. You must have noted that I have created ServiceBusSessionProcessorOptions to be passed into the CreateSessionProcessor method. We can use these options to control a variety of things, most importantly the following,
- AutoCompleteMessages
- When this is true, we don't explicitly need to Acknowledge the broker once the message is completed. Note: if there is an error in processing the message, the message won't be acknowledged as completed even though this is set to true.
- Default is true.
- MaxAutoLockRenewalDuration
- Maximum duration within which the session lock will be renewed automatically, the more is better.
- Default is 5 minutes.
- MaxConcurrentCallsPerSession
- Gets or sets the maximum number of concurrent calls to the message handler the processor should initiate per session.
- Default is 1.
- MaxConcurrentSessions
- Specifies how many sessions can be processed concurrently by the processor.
- Default is 8.
- PrefetchCount
- This specifies how many messages can be eagerly requested from the entity rather than doing a service request to get messages one by one. Note: this must be used with care, because the moment the broker released the message, that messages' Message Lock Duration starts.
- Default is 0.
The general recommendation is Processors should be the go-to
tool for writing applications that receive messages from Service Bus entities. The
Receivers are recommended for more complex scenarios in which the processor is not able to provide the fine-grained control that one can expect when using the Receiver directly. Azure Functions use Processors, so obviously, that's my go-to option.
You can find the sample code through the below link and play around.
Hope this helps.
Happy Coding.
Regards,
Jaliya