Thursday, April 29, 2021

Azure Service Bus Client Library for .NET: Receivers vs. Processors

Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are ReceiversServiceBusReceiver and ProcessorsServiceBusProcessor (when consuming messages from Sessions enabled entity ServiceBusSessionReceiver and ServiceBusSessionProcessor respectively).

In this post, let's see what these are and when to use Receivers or Processors. Here I am going to be using ServiceBusSessionProcessor as I am reading from Session enabled Queue.

First, let's have a look at how Receivers are used. In one of my previous posts, I wrote about Azure Service Bus: Handling FIFO using Sessions (please take a read, you might find it interesting) where I was using the Receivers approach. For the sake of simplicity, I will just share the same code.
ServiceBusSessionReceiver
You can see there are two do-while loops and a lot of ceremonies going on. We can use ServiceBusSessionProcessor to simplify this code and it offers additional features like automatic completion of processed messages, automatic message lock renewal, and concurrent execution etc.

Now let's have a look at how we can use ServiceBusSessionProcessor.
private static async Task RunWithSessionProcessor(ServiceBusClient client)
{
    var options = new ServiceBusSessionProcessorOptions()
    {
        // interesting options here
    };

    ServiceBusSessionProcessor processor = client.CreateSessionProcessor(Shared.Configuration.QUEUE_NAME, options);

    processor.SessionInitializingAsync += async args =>
    {
        Console.WriteLine($"Initializing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}', SessionLockedUntil: '{args.SessionLockedUntil}'");
    };

    processor.SessionClosingAsync += async args =>
    {
        Console.WriteLine($"Closing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}'\n");
    };

    processor.ProcessMessageAsync += async args =>
    {
        Console.WriteLine($"Received for Session: '{args.SessionId}', Message: '{args.Message.Body}', Ack: Complete");
    };

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Exception for Session: '{args.Exception.Message}'");
    };

    Console.WriteLine("Starting...Press any character to gracefully exit.");

    await processor.StartProcessingAsync();

    Console.ReadKey();

    Console.WriteLine("Stopping...");

    await processor.StopProcessingAsync();

    Console.WriteLine("Stopped!");
}
First, we are creating a ServiceBusSessionProcessor for the targetted queue (it has an overload if you are using Subscriptions). You can see it's mostly event-based and we have different events such as when initializing a session, receiving a message, etc. We register the events and then we just have to start the processing. You must have noted that I have created ServiceBusSessionProcessorOptions to be passed into the CreateSessionProcessor method. We can use these options to control a variety of things, most importantly the following,
  • AutoCompleteMessages
    • When this is true, we don't explicitly need to Acknowledge the broker once the message is completed. Note: if there is an error in processing the message, the message won't be acknowledged as completed even though this is set to true. 
    • Default is true.
  • MaxAutoLockRenewalDuration
    • Maximum duration within which the session lock will be renewed automatically, the more is better. 
    • Default is 5 minutes.
  • MaxConcurrentCallsPerSession
    • Gets or sets the maximum number of concurrent calls to the message handler the processor should initiate per session. 
    • Default is 1.
  • MaxConcurrentSessions
    • Specifies how many sessions can be processed concurrently by the processor. 
    • Default is 8.
  • PrefetchCount
    • This specifies how many messages can be eagerly requested from the entity rather than doing a service request to get messages one by one. Note: this must be used with care, because the moment the broker released the message, that messages' Message Lock Duration starts.
    • Default is 0.
The general recommendation is Processors should be the go-to tool for writing applications that receive messages from Service Bus entities. The Receivers are recommended for more complex scenarios in which the processor is not able to provide the fine-grained control that one can expect when using the Receiver directly. Azure Functions use Processors, so obviously, that's my go-to option.

You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Friday, April 23, 2021

Azure Service Bus: Handling FIFO using Sessions

In this post, let's see what Sessions are in Azure Service Bus. The primary use of Sessions (Groups) is to maintain FIFO when there are multiple receivers listening to the same Queue or Subscription. For the simplicity of the post, I will be focusing on Queues rather than Topics/Subscriptions, but the concept is the same.

We can create a session aware Queue in Azure by enabling this option when creating the queue. 

ASB Queue: Enable Sessions
Note: Sessions are available only in the Standard and Premium tiers of Service Bus. And once sessions are enabled for an entity (Queue/Subscription), that specific entity can only receive messages that have the SessionId set to a valid value.

Let's go by an example. I have created a session enabled Service Bus Queue. And I am using the newer Azure Service Bus .NET Client library Azure.Messaging.ServiceBus (which replaces Microsoft.Azure.ServiceBus).

I have the following Sender.
public record ApplicationMessage(string Message);

class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        ServiceBusSender sender = client.CreateSender(Shared.Configuration.QUEUE_NAME);

        List<stringsessionIds = new() { "S1""S2" };

        for (var i = 1; i <= 5; i++)
        {
            foreach (var sessionId in sessionIds)
            {
                var applicationMessage = new ApplicationMessage($"M{i}");
                await SendMessageAsync(sessionId, applicationMessage, sender);
            }
        }
    }

    static async Task SendMessageAsync(string sessionId, ApplicationMessage applicationMessage, ServiceBusSender serviceBusSender)
    {
        var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(applicationMessage)))
        {
            // Note: Since I am sending to a Session enabled Queue, SessionId should not be empty
            SessionId = sessionId,
            ContentType = "application/json"
        };

        await serviceBusSender.SendMessageAsync(message);

        Console.WriteLine($"Message sent: Session '{message.SessionId}', Message = '{applicationMessage.Message}'");
    }
}
So basically what it does is, adding messages to the target queue in the following order. Note: I am setting the SessionId in each message.
Queue
Now if we have just one instance of a receiver/consumer, we should be ok. But what if we have multiple instances of receivers processing these messages. There is no way to guarantee the FIFO here. That's where the use of sessions comes into the picture.

When Sessions are enabled for a queue, behind the scene it will create virtual queues. And the receivers can either choose a particular session they are interested in and Peek Lock that entire Session or pick whatever the next session which isn't already locked by any other receivers. So that particular receiver will only be processing messages that belongs to the session they locked in. And no other receivers would see those locked in session messages.
Virtual Queues
This is how we can implement such a receiver through the code.
class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        var cts = new CancellationTokenSource();
        Console.CancelKeyPress += (ao) =>  { Console.WriteLine("---I am Dead!---");   cts.Cancel();  };

        do
        {
            // Here we are accepting the next Session which isn't locked by any other receiver
            ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync(Shared.Configuration.QUEUE_NAME);

            Console.WriteLine($"Receiver started for SessionId: '{receiver.SessionId}'.");

            ServiceBusReceivedMessage message = null;
            do
            {
                message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1), cancellationToken: cts.Token);
                if (message != null)
                {
                    try
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Complete");
                        await receiver.CompleteMessageAsync(message, cts.Token);
                    }
                    catch
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Abondon");
                        await receiver.AbandonMessageAsync(message, cancellationToken: cts.Token);
                    }
                }
            }
            while (message != null && !cts.IsCancellationRequested);
            await receiver.CloseAsync();
        }
        while (!cts.IsCancellationRequested);
    }
}
Hint: We can use Processors (ServiceBusProcessor/ServiceBusSessionProcessor) and get rid of these do-while loops. And Processors are what's being used in Azure Functions.

And if we want to accept only a particular session, we can get rid of the outer do-while loop and do this.
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync(Shared.Configuration.QUEUE_NAME, "S1");
So when I run the sender and the receiver, I can see messages are getting processed group by sessions in a FIFO manner. Of cource, I have just a single instance of the receiver running, if you have multiple instances, you should be seeing different receivers picking different sessions messages.
Sender
Receiver

Some Important Q & A

  • In a session, what if one of the messages is failing to process by the receiver? How will the FIFO be maintained in that kind of a situation?
    • If one of the messages got failed in the middle, receiver can acknowledge the broker (Service Bus) by abandoning the message. This will make the message to be available again for immediate processing as the lock on the message held by the receiver will be released. And that particular message will get retried as per the retry policy. No other messages behind this failed message will get processed until the failed message is processed. It can eventually get completed or moved to dead letter queue/DLQ if not completed within the retry time frame.
    • Say in the above example, for some reason, Message: M3 is failing to process for some reason. So the output will be something like this,
Message Process Failure in the Middle
  • What if a receiver dies while in the middle of processing messages?
    • In this case, another receiver instance looking for available sessions will step in and take the control of the session and will continue from the last processed message.
    • Here in the below image, a worker dies in the middle, another worker steps in and continues.
One of the Receivers Dies and Another Receiver Takes Control
References and more reading/watching,
You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Friday, April 16, 2021

Visual Stuido 16.10.0 Preview 2.0: New Git Features

I am always using Visual Studio latest preview and this morning I saw an update notification for Visual Studio 16.10.0 Preview 2.0. I have just installed and relaunched, and instantly noticed some change in the rightmost button in Visual Studio.
Enhanced rightmost button in Visual Studio
Now when you click on it, you are getting something similar to Git Changes window. And when I opened Git Changes window, I saw this.
Git Changes: Connect to Azure DevOps
And connecting this will allow you access Work Items and Builds for the project. And we can even create a PR (of course, that will open up the browser). As far as I can remember, this was available for Git for some time, but not for Azure DevOps.
Git Changes: Create PR
And then I looked for more information and saw these 2 blog posts, you should definitely check these out. There are more features to explore.


Hope this helps.

Happy Coding.

Regards,
Jaliya

Sunday, April 11, 2021

Early Support for .NET Hot Reload is Now Available with .NET 6 Preview 3

.NET 6 Preview 3 is released just a couple of days back with some nice features. One of the nicest features that got released with this preview is the early support for .NET Hot Reload. Currently, it's only available for ASP.NET Core and Blazor Projects. The command is dotnet watch.

So what's this? Let's have a look using a demo.

First things first. we need to install .NET 6 Preview 3. I have already installed and this is my dotnet version.
PS C:\Users\Jaliya\Desktop> dotnet --version
6.0.100-preview.3.21202.5
Now I am creating a new ASP.NET Core WebApi Project by running the following command.
dotnet new webapi -n webapi
Once the project is created, I am opening it up on VS Code, and let's go through some of the files. First the csproj file.
.csproj file
As expected it's targetting .NET 6. Now let's have a look at launchSettings.json file under Properties folder. It looks similar to what we have already seen and know, but you should notice something new.
launchSettings.json
There is a new property named hotReloadProfile and its value is set to aspnetcore. So this is what's going to enable hot loading. For Blazor WebAssembly projects, the profile value is blazorwasm.

Now I have got rid of the default WeatherForecastController and added a new TestController.
using Microsoft.AspNetCore.Mvc;

namespace webapi.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class TestController : ControllerBase
    {
        [HttpGet]
        public IActionResult Get()
        {
            return Ok("Hello World");
        }
    }
}
Nothing fancy here, it's just a single GET action returning some string. Not what I am going to do is run dotnet watch. And it's kind of hard to show hot reloading in writing, let me share a screencast.
Hot Reload
In the image above, I have snapped VS Code to the left and a browser to the right. I am doing code changes on VS Code and reloading the page on the right. Here I run the project with dotnet run and while it's running, I am changing the value of the string our endpoint is returning and saving. And sending another request, now the endpoint is returning the new value.

So is it restarting the application behind the scene? That's an interesting question. Let's look at the below sample.

Here I have this interface and its implementation.
public interface ISingletonService
{
    int GetCurrentNumber();
}

public class SingletonService : 
ISingletonService
{
    private int _someNumber;

    public int GetCurrentNumber()
    {
        return _someNumber++;
    }
}
It is registered as a Singleton meaning one instance per the whole lifetime of this application.
services.AddSingleton<ISingletonService, SingletonService>();
Now I am changing my test endpoint to something like below.
[ApiController]
[Route("[controller]")]
public class TestController : ControllerBase
{
    private readonly ISingletonService _singletonService;

    public TestController(ISingletonService singletonService)
    {
        _singletonService = singletonService;
    }

    [HttpGet]
    public IActionResult Get()
    {
        return Ok($"Number: {_singletonService.GetCurrentNumber()}");
    }
}
So whenever I call this endpoint, it will return the current number calculated inside GetCurrentNumber method. Initially, it will start from 0, then per each call, it will increment by 1 and return the new value. Whenever I restart the application, it will again start from 0.
Hot Reload Preserving App State
Here again, I am doing a dotnet watch and while it's running, I am doing a change. You can see the output is getting changed, but most importantly the number keeping getting incremented from the last number. Basically, while the runtime is doing the hot reload, the app state is being preserved. It's not just a simple restart behind the scene.

Note: As of the day I am writing this post, this feature is not available through Visual Studio.

Isn't it nice or what!

Do try it out! I am pretty sure this is going to be loved by all .NET devs around the world.

Happy Coding.

Regards,
Jaliya

Saturday, April 10, 2021

How to Delete an Azure Hybrid Connection through Azure Portal

This is a quick post on how we can delete an Azure Hybrid Connection through Azure Portal. Thought of writing a post since it took some time for me to figure out.

Once an Azure Hybrid Connection is added to Azure App Service/Function, unfortunately, there is no delete option on the page where we added it or on the list of available Hybrid Connections.
Available Hybrid Connections
Once a Hybrid Connection is added there is a Relay that gets created (this is essentially created with the same name as the Service Bus namespace you have given when you are creating a new Hybrid Connection). You need to click on the Relay and go inside there.
Relay
There on the left side menu, you will see a menu item called Hybrid Connections and once you click on it, you will see your list of Hybrid Connections. Now click on your Hybrid Connection to delete, then you will see the Delete option.
Delete Hybrid Connection
Hope this helps!

Happy Coding.

Regards,
Jaliya