Friday, April 23, 2021

Azure Service Bus: Handling FIFO using Sessions

In this post, let's see what Sessions are in Azure Service Bus. The primary use of Sessions (Groups) is to maintain FIFO when there are multiple receivers listening to the same Queue or Subscription. For the simplicity of the post, I will be focusing on Queues rather than Topics/Subscriptions, but the concept is the same.

We can create a session aware Queue in Azure by enabling this option when creating the queue. 

ASB Queue: Enable Sessions
Note: Sessions are available only in the Standard and Premium tiers of Service Bus. And once sessions are enabled for an entity (Queue/Subscription), that specific entity can only receive messages that have the SessionId set to a valid value.

Let's go by an example. I have created a session enabled Service Bus Queue. And I am using the newer Azure Service Bus .NET Client library Azure.Messaging.ServiceBus (which replaces Microsoft.Azure.ServiceBus).

I have the following Sender.
public record ApplicationMessage(string Message);

class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        ServiceBusSender sender = client.CreateSender(Shared.Configuration.QUEUE_NAME);

        List<stringsessionIds = new() { "S1""S2" };

        for (var i = 1; i <= 5; i++)
        {
            foreach (var sessionId in sessionIds)
            {
                var applicationMessage = new ApplicationMessage($"M{i}");
                await SendMessageAsync(sessionId, applicationMessage, sender);
            }
        }
    }

    static async Task SendMessageAsync(string sessionId, ApplicationMessage applicationMessage, ServiceBusSender serviceBusSender)
    {
        var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(applicationMessage)))
        {
            // Note: Since I am sending to a Session enabled Queue, SessionId should not be empty
            SessionId = sessionId,
            ContentType = "application/json"
        };

        await serviceBusSender.SendMessageAsync(message);

        Console.WriteLine($"Message sent: Session '{message.SessionId}', Message = '{applicationMessage.Message}'");
    }
}
So basically what it does is, adding messages to the target queue in the following order. Note: I am setting the SessionId in each message.
Queue
Now if we have just one instance of a receiver/consumer, we should be ok. But what if we have multiple instances of receivers processing these messages. There is no way to guarantee the FIFO here. That's where the use of sessions comes into the picture.

When Sessions are enabled for a queue, behind the scene it will create virtual queues. And the receivers can either choose a particular session they are interested in and Peek Lock that entire Session or pick whatever the next session which isn't already locked by any other receivers. So that particular receiver will only be processing messages that belongs to the session they locked in. And no other receivers would see those locked in session messages.
Virtual Queues
This is how we can implement such a receiver through the code.
class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        var cts = new CancellationTokenSource();
        Console.CancelKeyPress += (ao) =>  { Console.WriteLine("---I am Dead!---");   cts.Cancel();  };

        do
        {
            // Here we are accepting the next Session which isn't locked by any other receiver
            ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync(Shared.Configuration.QUEUE_NAME);

            Console.WriteLine($"Receiver started for SessionId: '{receiver.SessionId}'.");

            ServiceBusReceivedMessage message = null;
            do
            {
                message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1), cancellationToken: cts.Token);
                if (message != null)
                {
                    try
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Complete");
                        await receiver.CompleteMessageAsync(message, cts.Token);
                    }
                    catch
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Abondon");
                        await receiver.AbandonMessageAsync(message, cancellationToken: cts.Token);
                    }
                }
            }
            while (message != null && !cts.IsCancellationRequested);
            await receiver.CloseAsync();
        }
        while (!cts.IsCancellationRequested);
    }
}
Hint: We can use Processors (ServiceBusProcessor/ServiceBusSessionProcessor) and get rid of these do-while loops. And Processors are what's being used in Azure Functions.

And if we want to accept only a particular session, we can get rid of the outer do-while loop and do this.
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync(Shared.Configuration.QUEUE_NAME, "S1");
So when I run the sender and the receiver, I can see messages are getting processed group by sessions in a FIFO manner. Of cource, I have just a single instance of the receiver running, if you have multiple instances, you should be seeing different receivers picking different sessions messages.
Sender
Receiver

Some Important Q & A

  • In a session, what if one of the messages is failing to process by the receiver? How will the FIFO be maintained in that kind of a situation?
    • If one of the messages got failed in the middle, receiver can acknowledge the broker (Service Bus) by abandoning the message. This will make the message to be available again for immediate processing as the lock on the message held by the receiver will be released. And that particular message will get retried as per the retry policy. No other messages behind this failed message will get processed until the failed message is processed. It can eventually get completed or moved to dead letter queue/DLQ if not completed within the retry time frame.
    • Say in the above example, for some reason, Message: M3 is failing to process for some reason. So the output will be something like this,
Message Process Failure in the Middle
  • What if a receiver dies while in the middle of processing messages?
    • In this case, another receiver instance looking for available sessions will step in and take the control of the session and will continue from the last processed message.
    • Here in the below image, a worker dies in the middle, another worker steps in and continues.
One of the Receivers Dies and Another Receiver Takes Control
References and more reading/watching,
You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

No comments:

Post a Comment