In this post, let's have a look at a nice feature that comes with Azure
Service Bus Topics/Subscriptions and that is
Topic Filters.
Usually, when we publish a message under a Topic, all its subscribers will
receive a copy of the message. But sometimes, we want to route specific messages only to a specific
subscription based on the message. For example, let's say we
have a Topic and a Subscription, and the Subscription is session-enabled. And
once sessions are enabled in a queue/subscription, it can decrease the
throughput. And for some messages, we might not really need to use Sessions,
because we don't care about the order of processing. In that case, it makes
sense to use a different subscription without sessions and route the
messages which don't require sessions there. And Topic Filters can be quite handy here.
Now let's see the things in action.
First, let's go ahead and create a Topic.
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; var connectionString = "<ServiceBus_ConnectionString>"; var TopicName = "sbt-test-topic"; var SimpleSubscriptionName = "sbs-simple-subscription"; var SessionEnabledSubscriptionName = "sbs-session-enabled-subscription"; var serviceBusClient = new ServiceBusClient(connectionString); var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString); if (!await serviceBusAdministrationClient.TopicExistsAsync(TopicName)) { await serviceBusAdministrationClient.CreateTopicAsync(TopicName); }
And now I am going to create two subscriptions, one with Session enabled and
one without.
if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SimpleSubscriptionName)) { await serviceBusAdministrationClient.CreateSubscriptionAsync( new CreateSubscriptionOptions(TopicName, SimpleSubscriptionName), new CreateRuleOptions { Name = "RequiresSessionRule", Filter = new SqlRuleFilter("RequiresSession IS NULL OR RequiresSession = false") }); } if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SessionEnabledSubscriptionName)) { await serviceBusAdministrationClient.CreateSubscriptionAsync( new CreateSubscriptionOptions(TopicName, SessionEnabledSubscriptionName) { RequiresSession = true }, new CreateRuleOptions { Name = "RequiresSessionRule", Filter = new SqlRuleFilter("RequiresSession = true") }); }
The important part is the CreateRuleOptions
parameter. Here for the subscriptions, I have added Filters,
which is of type Sql Filter. There are another two types of filters which
are Boolean filters and Correlation Filters (we aren't going to use them in this post). Now you
might wonder where this RequiresSession property is. One of
the most important things to note with Topic Filters is "Filters can evaluate only message properties. Filters can't evaluate the
message body". So we need to add RequiresSession property to the ServiceBusMessage that we are publishing (not to the body).
List<ServiceBusMessage> CreateMessages() { return new() { CreateMessage(new("Message 1", false)), CreateMessage(new("Message 2", false)), CreateMessage(new("Message 3")), CreateMessage(new("Message 4")), CreateMessage(new("Session Required Message 1-1", true, "Session1")), CreateMessage(new("Session Required Message 1-2", true, "Session1")), }; } ServiceBusMessage CreateMessage(MyMessage message) { ServiceBusMessage serviceBusMessage = new() { Subject = message.ToString(), SessionId = message.SessionId }; serviceBusMessage.ApplicationProperties.Add("Subject", message.Subject); if (message.RequiresSession.HasValue) { serviceBusMessage.ApplicationProperties.Add("RequiresSession", message.RequiresSession); } return serviceBusMessage; } record MyMessage(string Subject, bool? RequiresSession = null, string SessionId = "");
So here, I have created some test messages and note I have
added RequiresSession property into
ServiceBusMessages'
ApplicationProperties.
And now, let's publish these messages out.
List<ServiceBusMessage> messages = CreateMessages(); ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(TopicName); await serviceBusSender.SendMessagesAsync(messages);
And I can see my messages got routed to the correct subscription.
And if you are using a tool like
Service Bus Explorer, you can even query the messages using a Filter.
Read More:
Hope this helps.
Happy Coding.
Regards,
Jaliya
Jaliya
No comments:
Post a Comment