In this post, let's have a look at a nice feature that comes with Azure
  Service Bus Topics/Subscriptions and that is
  Topic Filters. 
Usually, when we publish a message under a Topic, all its subscribers will
  receive a copy of the message. But sometimes, we want to route specific messages only to a specific
  subscription based on the message. For example, let's say we
  have a Topic and a Subscription, and the Subscription is session-enabled. And
  once sessions are enabled in a queue/subscription, it can decrease the
  throughput. And for some messages, we might not really need to use Sessions,
  because we don't care about the order of processing. In that case, it makes
  sense to use a different subscription without sessions and route the
  messages which don't require sessions there. And Topic Filters can be quite handy here.
Now let's see the things in action.
First, let's go ahead and create a Topic.
using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; var connectionString = "<ServiceBus_ConnectionString>"; var TopicName = "sbt-test-topic"; var SimpleSubscriptionName = "sbs-simple-subscription"; var SessionEnabledSubscriptionName = "sbs-session-enabled-subscription"; var serviceBusClient = new ServiceBusClient(connectionString); var serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString); if (!await serviceBusAdministrationClient.TopicExistsAsync(TopicName)) { await serviceBusAdministrationClient.CreateTopicAsync(TopicName); }
  And now I am going to create two subscriptions, one with Session enabled and
  one without.
if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SimpleSubscriptionName)) { await serviceBusAdministrationClient.CreateSubscriptionAsync( new CreateSubscriptionOptions(TopicName, SimpleSubscriptionName), new CreateRuleOptions { Name = "RequiresSessionRule", Filter = new SqlRuleFilter("RequiresSession IS NULL OR RequiresSession = false") }); } if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(TopicName, SessionEnabledSubscriptionName)) { await serviceBusAdministrationClient.CreateSubscriptionAsync( new CreateSubscriptionOptions(TopicName, SessionEnabledSubscriptionName) { RequiresSession = true }, new CreateRuleOptions { Name = "RequiresSessionRule", Filter = new SqlRuleFilter("RequiresSession = true") }); }
  The important part is the CreateRuleOptions
  parameter. Here for the subscriptions, I have added Filters,
  which is of type Sql Filter. There are another two types of filters which
  are Boolean filters and Correlation Filters (we aren't going to use them in this post). Now you
  might wonder where this RequiresSession property is. One of
  the most important things to note with Topic Filters is "Filters can evaluate only message properties. Filters can't evaluate the
    message body". So we need to add RequiresSession property to the ServiceBusMessage that we are publishing (not to the body).
List<ServiceBusMessage> CreateMessages() { return new() { CreateMessage(new("Message 1", false)), CreateMessage(new("Message 2", false)), CreateMessage(new("Message 3")), CreateMessage(new("Message 4")), CreateMessage(new("Session Required Message 1-1", true, "Session1")), CreateMessage(new("Session Required Message 1-2", true, "Session1")), }; } ServiceBusMessage CreateMessage(MyMessage message) { ServiceBusMessage serviceBusMessage = new() { Subject = message.ToString(), SessionId = message.SessionId }; serviceBusMessage.ApplicationProperties.Add("Subject", message.Subject); if (message.RequiresSession.HasValue) { serviceBusMessage.ApplicationProperties.Add("RequiresSession", message.RequiresSession); } return serviceBusMessage; } record MyMessage(string Subject, bool? RequiresSession = null, string SessionId = "");
  So here, I have created some test messages and note I have
  added RequiresSession property into
  ServiceBusMessages'
  ApplicationProperties.
  And now, let's publish these messages out.
List<ServiceBusMessage> messages = CreateMessages(); ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(TopicName); await serviceBusSender.SendMessagesAsync(messages);
And I can see my messages got routed to the correct subscription.
  And if you are using a tool like
  Service Bus Explorer, you can even query the messages using a Filter.
Read More:
Hope this helps.
Happy Coding.
Regards,
Jaliya
Jaliya




No comments:
Post a Comment