In this post, let's see what Sessions are in Azure Service Bus. The primary use of Sessions (Groups) is to maintain FIFO when there are multiple receivers listening to the same Queue or Subscription. For the simplicity of the post, I will be focusing on Queues rather than Topics/Subscriptions, but the concept is the same.
We can create a session aware Queue in Azure by enabling this option when creating the queue.
ASB Queue: Enable Sessions |
Note: Sessions are available only in
the Standard and Premium tiers of Service
Bus. And once sessions are enabled for an entity (Queue/Subscription), that
specific entity can only receive messages that have the SessionId set
to a valid value.
Let's go by an example. I have created a session enabled Service Bus Queue. And I am using the
newer Azure Service Bus .NET Client library Azure.Messaging.ServiceBus (which replaces
Microsoft.Azure.ServiceBus).
I have the following Sender.
public record ApplicationMessage(string Message);
class Program
{
static async Task Main(string[] args)
{
await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);
ServiceBusSender sender = client.CreateSender(Shared.Configuration.QUEUE_NAME);
List<string> sessionIds = new() { "S1", "S2" };
for (var i = 1; i <= 5; i++)
{
foreach (var sessionId in sessionIds)
{
var applicationMessage = new ApplicationMessage($"M{i}");
await SendMessageAsync(sessionId, applicationMessage, sender);
}
}
}
static async Task SendMessageAsync(string sessionId, ApplicationMessage applicationMessage, ServiceBusSender serviceBusSender)
{
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(applicationMessage)))
{
// Note: Since I am sending to a Session enabled Queue, SessionId should not be empty
SessionId = sessionId,
ContentType = "application/json"
};
await serviceBusSender.SendMessageAsync(message);
Console.WriteLine($"Message sent: Session '{message.SessionId}', Message = '{applicationMessage.Message}'");
}
}
So basically what it does is, adding messages to the target queue in the
following order. Note: I am setting the SessionId in each
message.
Queue |
When Sessions are enabled for a queue, behind the scene it will create
virtual queues. And the receivers can either choose a particular session they
are interested in and Peek Lock that entire Session or pick whatever the next
session which isn't already locked by any other receivers. So that
particular receiver will only be processing messages that belongs to the
session they locked in. And no other receivers would see those locked in
session messages.
class Program
{
static async Task Main(string[] args)
{
await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (a, o) => { Console.WriteLine("---I am Dead!---"); cts.Cancel(); };
do
{
// Here we are accepting the next Session which isn't locked by any other receiver
ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync(Shared.Configuration.QUEUE_NAME);
Console.WriteLine($"Receiver started for SessionId: '{receiver.SessionId}'.");
ServiceBusReceivedMessage message = null;
do
{
message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1), cancellationToken: cts.Token);
if (message != null)
{
try
{
Console.WriteLine($"Received: '{message.Body}', Ack: Complete");
await receiver.CompleteMessageAsync(message, cts.Token);
}
catch
{
Console.WriteLine($"Received: '{message.Body}', Ack: Abondon");
await receiver.AbandonMessageAsync(message, cancellationToken: cts.Token);
}
}
}
while (message != null && !cts.IsCancellationRequested);
await receiver.CloseAsync();
}
while (!cts.IsCancellationRequested);
}
}
Hint: We can use Processors (ServiceBusProcessor/ServiceBusSessionProcessor) and get rid of these do-while loops. And Processors are what's being used
in Azure Functions.
And if we want to accept only a particular session, we can get rid of the
outer do-while loop and do this.
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync(Shared.Configuration.QUEUE_NAME, "S1");
So when I run the sender and the receiver, I can see messages are getting
processed group by sessions in a FIFO manner. Of cource, I have just a
single instance of the receiver running, if you have multiple instances, you
should be seeing different receivers picking different sessions
messages.
- In a session, what if one of the messages is failing to process by the receiver? How will the FIFO be maintained in that kind of a situation?
- If one of the messages got failed in the middle, receiver can acknowledge the broker (Service Bus) by abandoning the message. This will make the message to be available again for immediate processing as the lock on the message held by the receiver will be released. And that particular message will get retried as per the retry policy. No other messages behind this failed message will get processed until the failed message is processed. It can eventually get completed or moved to dead letter queue/DLQ if not completed within the retry time frame.
- Say in the above example, for some reason, Message: M3 is failing to process for some reason. So the output will be something like this,
Message Process Failure in the Middle |
- What if a receiver dies while in the middle of processing messages?
- In this case, another receiver instance looking for available sessions will step in and take the control of the session and will continue from the last processed message.
- Here in the below image, a worker dies in the middle, another worker steps in and continues.
References and more reading/watching,
- Azure Service Bus Message Sessions
- On .NET Live - Messaging Patterns for .NET Developers with Clemens Vasters: Principal Architect, Messaging Services and Standards, Microsoft
- Channel 9: Advanced Features with Azure Service Bus with Ashish Chhabria: Senior Product Manager, Microsoft
You can find the sample code through the below link and play around.
Hope this helps.
Happy Coding.
Regards,
Jaliya
No comments:
Post a Comment