Tuesday, November 2, 2021

Azure Service Bus Topic Filters

In this post, let's have a look at a nice feature that comes with Azure Service Bus Topics/Subscriptions and that is Topic Filters

Usually, when we publish a message under a Topic, all its subscribers will receive a copy of the message. But sometimes, we want to route specific messages only to a specific subscription based on the message. For example, let's say we have a Topic and a Subscription, and the Subscription is session-enabled. And once sessions are enabled in a queue/subscription, it can decrease the throughput. And for some messages, we might not really need to use Sessions, because we don't care about the order of processing. In that case, it makes sense to use a different subscription without sessions and route the messages which don't require sessions there. And Topic Filters can be quite handy here.

Now let's see the things in action.

First, let's go ahead and create a Topic.
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
 
var connectionString = "<ServiceBus_ConnectionString>";
 
var TopicName = "sbt-test-topic";
var SimpleSubscriptionName = "sbs-simple-subscription";
var SessionEnabledSubscriptionName = "sbs-session-enabled-subscription";
 
var serviceBusClient = new ServiceBusClient(connectionString);
var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString);
 
if (!await serviceBusAdministrationClient.TopicExistsAsync(TopicName))
{
    await serviceBusAdministrationClient.CreateTopicAsync(TopicName);
}
And now I am going to create two subscriptions, one with Session enabled and one without.
if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SimpleSubscriptionName))
{
    await serviceBusAdministrationClient.CreateSubscriptionAsync(
        new CreateSubscriptionOptions(TopicName, SimpleSubscriptionName),
        new CreateRuleOptions
        {
            Name = "RequiresSessionRule",
            Filter = new SqlRuleFilter("RequiresSession IS NULL OR RequiresSession = false")
        });
}
 
if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SessionEnabledSubscriptionName))
{
    await serviceBusAdministrationClient.CreateSubscriptionAsync(
        new CreateSubscriptionOptions(TopicName, SessionEnabledSubscriptionName)
        {
            RequiresSession = true
        },
        new CreateRuleOptions
        {
            Name = "RequiresSessionRule",
            Filter = new SqlRuleFilter("RequiresSession = true")
        });
}
The important part is the CreateRuleOptions parameter. Here for the subscriptions, I have added Filters, which is of type Sql Filter. There are another two types of filters which are Boolean filters and Correlation Filters (we aren't going to use them in this post). Now you might wonder where this RequiresSession property is. One of the most important things to note with Topic Filters is "Filters can evaluate only message properties. Filters can't evaluate the message body". So we need to add RequiresSession property to the ServiceBusMessage that we are publishing (not to the body).
List<ServiceBusMessage> CreateMessages()
{
    return new()
    {
        CreateMessage(new("Message 1"false)),
        CreateMessage(new("Message 2"false)),
        CreateMessage(new("Message 3")),
        CreateMessage(new("Message 4")),
        CreateMessage(new("Session Required Message 1-1"true"Session1")),
        CreateMessage(new("Session Required Message 1-2"true"Session1")),
    };
}
 
ServiceBusMessage CreateMessage(MyMessage message)
{
    ServiceBusMessage serviceBusMessage = new()
    {
        Subject = message.ToString(),
        SessionId = message.SessionId
    };
    serviceBusMessage.ApplicationProperties.Add("Subject", message.Subject);
 
    if (message.RequiresSession.HasValue)
    {
        serviceBusMessage.ApplicationProperties.Add("RequiresSession", message.RequiresSession);
    }
 
    return serviceBusMessage;
}
 
record MyMessage(string SubjectboolRequiresSession = nullstring SessionId = "");
So here, I have created some test messages and note I have added RequiresSession property into ServiceBusMessages' ApplicationProperties.
 
And now, let's publish these messages out.
List<ServiceBusMessage> messages = CreateMessages();
ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(TopicName);
await serviceBusSender.SendMessagesAsync(messages);
And I can see my messages got routed to the correct subscription.
Subscriptions
We can create/edit filters for subscriptions using Azure Portal as well.
Add Filter using Azure Portal
And if you are using a tool like Service Bus Explorer, you can even query the messages using a Filter.
Filter
Filtered Message
Read More:

Hope this helps.

Happy Coding.

Regards,
Jaliya

No comments:

Post a Comment