Thursday, April 29, 2021

Azure Service Bus Client Library for .NET: Receivers vs. Processors

Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are ReceiversServiceBusReceiver and ProcessorsServiceBusProcessor (when consuming messages from Sessions enabled entity ServiceBusSessionReceiver and ServiceBusSessionProcessor respectively).

In this post, let's see what these are and when to use Receivers or Processors. Here I am going to be using ServiceBusSessionProcessor as I am reading from Session enabled Queue.

First, let's have a look at how Receivers are used. In one of my previous posts, I wrote about Azure Service Bus: Handling FIFO using Sessions (please take a read, you might find it interesting) where I was using the Receivers approach. For the sake of simplicity, I will just share the same code.
ServiceBusSessionReceiver
You can see there are two do-while loops and a lot of ceremonies going on. We can use ServiceBusSessionProcessor to simplify this code and it offers additional features like automatic completion of processed messages, automatic message lock renewal, and concurrent execution etc.

Now let's have a look at how we can use ServiceBusSessionProcessor.
private static async Task RunWithSessionProcessor(ServiceBusClient client)
{
    var options = new ServiceBusSessionProcessorOptions()
    {
        // interesting options here
    };

    ServiceBusSessionProcessor processor = client.CreateSessionProcessor(Shared.Configuration.QUEUE_NAME, options);

    processor.SessionInitializingAsync += async args =>
    {
        Console.WriteLine($"Initializing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}', SessionLockedUntil: '{args.SessionLockedUntil}'");
    };

    processor.SessionClosingAsync += async args =>
    {
        Console.WriteLine($"Closing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}'\n");
    };

    processor.ProcessMessageAsync += async args =>
    {
        Console.WriteLine($"Received for Session: '{args.SessionId}', Message: '{args.Message.Body}', Ack: Complete");
    };

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Exception for Session: '{args.Exception.Message}'");
    };

    Console.WriteLine("Starting...Press any character to gracefully exit.");

    await processor.StartProcessingAsync();

    Console.ReadKey();

    Console.WriteLine("Stopping...");

    await processor.StopProcessingAsync();

    Console.WriteLine("Stopped!");
}
First, we are creating a ServiceBusSessionProcessor for the targetted queue (it has an overload if you are using Subscriptions). You can see it's mostly event-based and we have different events such as when initializing a session, receiving a message, etc. We register the events and then we just have to start the processing. You must have noted that I have created ServiceBusSessionProcessorOptions to be passed into the CreateSessionProcessor method. We can use these options to control a variety of things, most importantly the following,
  • AutoCompleteMessages
    • When this is true, we don't explicitly need to Acknowledge the broker once the message is completed. Note: if there is an error in processing the message, the message won't be acknowledged as completed even though this is set to true. 
    • Default is true.
  • MaxAutoLockRenewalDuration
    • Maximum duration within which the session lock will be renewed automatically, the more is better. 
    • Default is 5 minutes.
  • MaxConcurrentCallsPerSession
    • Gets or sets the maximum number of concurrent calls to the message handler the processor should initiate per session. 
    • Default is 1.
  • MaxConcurrentSessions
    • Specifies how many sessions can be processed concurrently by the processor. 
    • Default is 8.
  • PrefetchCount
    • This specifies how many messages can be eagerly requested from the entity rather than doing a service request to get messages one by one. Note: this must be used with care, because the moment the broker released the message, that messages' Message Lock Duration starts.
    • Default is 0.
The general recommendation is Processors should be the go-to tool for writing applications that receive messages from Service Bus entities. The Receivers are recommended for more complex scenarios in which the processor is not able to provide the fine-grained control that one can expect when using the Receiver directly. Azure Functions use Processors, so obviously, that's my go-to option.

You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Friday, April 23, 2021

Azure Service Bus: Handling FIFO using Sessions

In this post, let's see what Sessions are in Azure Service Bus. The primary use of Sessions (Groups) is to maintain FIFO when there are multiple receivers listening to the same Queue or Subscription. For the simplicity of the post, I will be focusing on Queues rather than Topics/Subscriptions, but the concept is the same.

We can create a session aware Queue in Azure by enabling this option when creating the queue. 

ASB Queue: Enable Sessions
Note: Sessions are available only in the Standard and Premium tiers of Service Bus. And once sessions are enabled for an entity (Queue/Subscription), that specific entity can only receive messages that have the SessionId set to a valid value.

Let's go by an example. I have created a session enabled Service Bus Queue. And I am using the newer Azure Service Bus .NET Client library Azure.Messaging.ServiceBus (which replaces Microsoft.Azure.ServiceBus).

I have the following Sender.
public record ApplicationMessage(string Message);

class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        ServiceBusSender sender = client.CreateSender(Shared.Configuration.QUEUE_NAME);

        List<stringsessionIds = new() { "S1""S2" };

        for (var i = 1; i <= 5; i++)
        {
            foreach (var sessionId in sessionIds)
            {
                var applicationMessage = new ApplicationMessage($"M{i}");
                await SendMessageAsync(sessionId, applicationMessage, sender);
            }
        }
    }

    static async Task SendMessageAsync(string sessionId, ApplicationMessage applicationMessage, ServiceBusSender serviceBusSender)
    {
        var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(applicationMessage)))
        {
            // Note: Since I am sending to a Session enabled Queue, SessionId should not be empty
            SessionId = sessionId,
            ContentType = "application/json"
        };

        await serviceBusSender.SendMessageAsync(message);

        Console.WriteLine($"Message sent: Session '{message.SessionId}', Message = '{applicationMessage.Message}'");
    }
}
So basically what it does is, adding messages to the target queue in the following order. Note: I am setting the SessionId in each message.
Queue
Now if we have just one instance of a receiver/consumer, we should be ok. But what if we have multiple instances of receivers processing these messages. There is no way to guarantee the FIFO here. That's where the use of sessions comes into the picture.

When Sessions are enabled for a queue, behind the scene it will create virtual queues. And the receivers can either choose a particular session they are interested in and Peek Lock that entire Session or pick whatever the next session which isn't already locked by any other receivers. So that particular receiver will only be processing messages that belongs to the session they locked in. And no other receivers would see those locked in session messages.
Virtual Queues
This is how we can implement such a receiver through the code.
class Program
{
    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(Shared.Configuration.CONNECTION_STRING);

        var cts = new CancellationTokenSource();
        Console.CancelKeyPress += (ao) =>  { Console.WriteLine("---I am Dead!---");   cts.Cancel();  };

        do
        {
            // Here we are accepting the next Session which isn't locked by any other receiver
            ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync(Shared.Configuration.QUEUE_NAME);

            Console.WriteLine($"Receiver started for SessionId: '{receiver.SessionId}'.");

            ServiceBusReceivedMessage message = null;
            do
            {
                message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1), cancellationToken: cts.Token);
                if (message != null)
                {
                    try
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Complete");
                        await receiver.CompleteMessageAsync(message, cts.Token);
                    }
                    catch
                    {
                        Console.WriteLine($"Received: '{message.Body}', Ack: Abondon");
                        await receiver.AbandonMessageAsync(message, cancellationToken: cts.Token);
                    }
                }
            }
            while (message != null && !cts.IsCancellationRequested);
            await receiver.CloseAsync();
        }
        while (!cts.IsCancellationRequested);
    }
}
Hint: We can use Processors (ServiceBusProcessor/ServiceBusSessionProcessor) and get rid of these do-while loops. And Processors are what's being used in Azure Functions.

And if we want to accept only a particular session, we can get rid of the outer do-while loop and do this.
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync(Shared.Configuration.QUEUE_NAME, "S1");
So when I run the sender and the receiver, I can see messages are getting processed group by sessions in a FIFO manner. Of cource, I have just a single instance of the receiver running, if you have multiple instances, you should be seeing different receivers picking different sessions messages.
Sender
Receiver

Some Important Q & A

  • In a session, what if one of the messages is failing to process by the receiver? How will the FIFO be maintained in that kind of a situation?
    • If one of the messages got failed in the middle, receiver can acknowledge the broker (Service Bus) by abandoning the message. This will make the message to be available again for immediate processing as the lock on the message held by the receiver will be released. And that particular message will get retried as per the retry policy. No other messages behind this failed message will get processed until the failed message is processed. It can eventually get completed or moved to dead letter queue/DLQ if not completed within the retry time frame.
    • Say in the above example, for some reason, Message: M3 is failing to process for some reason. So the output will be something like this,
Message Process Failure in the Middle
  • What if a receiver dies while in the middle of processing messages?
    • In this case, another receiver instance looking for available sessions will step in and take the control of the session and will continue from the last processed message.
    • Here in the below image, a worker dies in the middle, another worker steps in and continues.
One of the Receivers Dies and Another Receiver Takes Control
References and more reading/watching,
You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Friday, April 16, 2021

Visual Stuido 16.10.0 Preview 2.0: New Git Features

I am always using Visual Studio latest preview and this morning I saw an update notification for Visual Studio 16.10.0 Preview 2.0. I have just installed and relaunched, and instantly noticed some change in the rightmost button in Visual Studio.
Enhanced rightmost button in Visual Studio
Now when you click on it, you are getting something similar to Git Changes window. And when I opened Git Changes window, I saw this.
Git Changes: Connect to Azure DevOps
And connecting this will allow you access Work Items and Builds for the project. And we can even create a PR (of course, that will open up the browser). As far as I can remember, this was available for Git for some time, but not for Azure DevOps.
Git Changes: Create PR
And then I looked for more information and saw these 2 blog posts, you should definitely check these out. There are more features to explore.


Hope this helps.

Happy Coding.

Regards,
Jaliya

Sunday, April 11, 2021

Early Support for .NET Hot Reload is Now Available with .NET 6 Preview 3

.NET 6 Preview 3 is released just a couple of days back with some nice features. One of the nicest features that got released with this preview is the early support for .NET Hot Reload. Currently, it's only available for ASP.NET Core and Blazor Projects. The command is dotnet watch.

So what's this? Let's have a look using a demo.

First things first. we need to install .NET 6 Preview 3. I have already installed and this is my dotnet version.
PS C:\Users\Jaliya\Desktop> dotnet --version
6.0.100-preview.3.21202.5
Now I am creating a new ASP.NET Core WebApi Project by running the following command.
dotnet new webapi -n webapi
Once the project is created, I am opening it up on VS Code, and let's go through some of the files. First the csproj file.
.csproj file
As expected it's targetting .NET 6. Now let's have a look at launchSettings.json file under Properties folder. It looks similar to what we have already seen and know, but you should notice something new.
launchSettings.json
There is a new property named hotReloadProfile and its value is set to aspnetcore. So this is what's going to enable hot loading. For Blazor WebAssembly projects, the profile value is blazorwasm.

Now I have got rid of the default WeatherForecastController and added a new TestController.
using Microsoft.AspNetCore.Mvc;

namespace webapi.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class TestController : ControllerBase
    {
        [HttpGet]
        public IActionResult Get()
        {
            return Ok("Hello World");
        }
    }
}
Nothing fancy here, it's just a single GET action returning some string. Not what I am going to do is run dotnet watch. And it's kind of hard to show hot reloading in writing, let me share a screencast.
Hot Reload
In the image above, I have snapped VS Code to the left and a browser to the right. I am doing code changes on VS Code and reloading the page on the right. Here I run the project with dotnet run and while it's running, I am changing the value of the string our endpoint is returning and saving. And sending another request, now the endpoint is returning the new value.

So is it restarting the application behind the scene? That's an interesting question. Let's look at the below sample.

Here I have this interface and its implementation.
public interface ISingletonService
{
    int GetCurrentNumber();
}

public class SingletonService : 
ISingletonService
{
    private int _someNumber;

    public int GetCurrentNumber()
    {
        return _someNumber++;
    }
}
It is registered as a Singleton meaning one instance per the whole lifetime of this application.
services.AddSingleton<ISingletonService, SingletonService>();
Now I am changing my test endpoint to something like below.
[ApiController]
[Route("[controller]")]
public class TestController : ControllerBase
{
    private readonly ISingletonService _singletonService;

    public TestController(ISingletonService singletonService)
    {
        _singletonService = singletonService;
    }

    [HttpGet]
    public IActionResult Get()
    {
        return Ok($"Number: {_singletonService.GetCurrentNumber()}");
    }
}
So whenever I call this endpoint, it will return the current number calculated inside GetCurrentNumber method. Initially, it will start from 0, then per each call, it will increment by 1 and return the new value. Whenever I restart the application, it will again start from 0.
Hot Reload Preserving App State
Here again, I am doing a dotnet watch and while it's running, I am doing a change. You can see the output is getting changed, but most importantly the number keeping getting incremented from the last number. Basically, while the runtime is doing the hot reload, the app state is being preserved. It's not just a simple restart behind the scene.

Isn't it nice or what!

Do try it out! I am pretty sure this is going to be loved by all .NET devs around the world.

Happy Coding.

Regards,
Jaliya

Saturday, April 10, 2021

How to Delete an Azure Hybrid Connection through Azure Portal

This is a quick post on how we can delete an Azure Hybrid Connection through Azure Portal. Thought of writing a post since it took some time for me to figure out.

Once an Azure Hybrid Connection is added to Azure App Service/Function, unfortunately, there is no delete option on the page where we added it or on the list of available Hybrid Connections.
Available Hybrid Connections
Once a Hybrid Connection is added there is a Relay that gets created (this is essentially created with the same name as the Service Bus namespace you have given when you are creating a new Hybrid Connection). You need to click on the Relay and go inside there.
Relay
There on the left side menu, you will see a menu item called Hybrid Connections and once you click on it, you will see your list of Hybrid Connections. Now click on your Hybrid Connection to delete, then you will see the Delete option.
Delete Hybrid Connection
Hope this helps!

Happy Coding.

Regards,
Jaliya

Tuesday, March 23, 2021

ASP.NET Core and Swagger: Add Operations Programmatically

In this post, let's see how we can programmatically add Operations to Swagger document in an ASP.NET Core Application.

I had a requirement where an Angular application uses DevExpress Report Viewer and the reports are being rendered through an ASP.NET Core API. Front-end communicates with BE through an Azure API Gateway. So basically all the back-end services endpoints need to be exposed via Azure API Gateway and it's getting updated by discovering the endpoints in back-end services Swagger documents. But unfortunately Swagger is failing to discover the DevExpress default reporting endpoints.

So I have overridden the default endpoints and applied [ApiExplorerSettings(IgnoreApi = true)] attribute. Now those endpoints will be ignored from getting generated into the Swagger Document, but still, I needed to programmatically add those.

It's actually quite simple. I just needed to create a new IDocumentFilterIDocumentFilters are useful when we need to modify Swagger Documents after they're initially generated.

using Microsoft.OpenApi.Models;
using Swashbuckle.AspNetCore.SwaggerGen;
using System.Collections.Generic;

public class AddReportViewerOperationsFilter : IDocumentFilter
{
    public void Apply(OpenApiDocument swaggerDocDocumentFilterContext context)
    {
        // Tags are for group the operations
        var openApiTags = new List<OpenApiTag> { new OpenApiTag { Name = "ReportViewer" } };

        // whatever the path
        swaggerDoc.Paths.Add($"/api/{swaggerDoc.Info.Version}/Reports/Viewer"new OpenApiPathItem
        {
            Operations = new Dictionary<OperationTypeOpenApiOperation>
            {
                // GET: /api/v1/Reports/Viewer
                {
                    OperationType.Get,
                    new OpenApiOperation
                    {
                        Tags = openApiTags,
                        Responses = new OpenApiResponses
                        {
                            { "200"new OpenApiResponse { Description = "Success" } }
                        }
                    }
                },
                // POST: /api/v1/Reports/Viewer
                {
                    OperationType.Post,
                    new OpenApiOperation
                    {
                        Tags = openApiTags,
                        Responses = new OpenApiResponses
                        {
                            { "200"new OpenApiResponse { Description = "Success" } }
                        }
                    }
                }
            }
        });
    }
}

So here I have just added 2 Operations, but you get the idea. You can define the Parameters for Operations if you want to, in my case, I didn't want to.

Finally, we need to register the AddReportViewerOperationsFilter when setting up Swagger. 

public void ConfigureServices(IServiceCollection services)
{
    // some code

    services.AddSwaggerGen(config =>
    {
        // some code

        config.DocumentFilter<AddReportViewerOperationsFilter>();
    }); // some code }

And now, when we spin up the service, we can see the new operations that got added in the Swagger document.

Swagger: Programmatically Added Operations

Hope this helps.

Happy Coding.

Regards,
Jaliya

Monday, March 15, 2021

WPF: External Login with Identity Server using Microsoft Edge WebView2

In this post, let's see how we can use Microsoft Edge WebView2 in a WPF Application to Sign In with an IdentityServer. This is inspired by the sample project available at IdentityModel/IdentityModel.OidcClient.Samples/WpfWebView where it's using WebBrowser. I wasn't happy with the appearance at all.

Not Responsive
Not Responsive
I thought of trying with WebView2 instead of WebBrowserWebView2 uses Microsoft Edge (Chromium) as the rendering engine whereas WebBrowser uses Internet Explorer.

So I have created a WPF Application targetting .NET 5 (please check WebView2 supported frameworks/platforms of here) and tried the code in IdentityModel/IdentityModel.OidcClient.Samples/WpfWebView project. But since I am going to be using WebView2, I have installed Microsoft.Web.WebView2 NuGet package and changed the WpfEmbeddedBrowser class as follows.

WpfEmbeddedBrowser.cs
public class WpfEmbeddedBrowser : IBrowser
{
    private BrowserOptions _options = null;

    public async Task<BrowserResultInvokeAsync(BrowserOptions optionsCancellationToken cancellationToken = default)
    {
        _options = options;

        var semaphoreSlim = new SemaphoreSlim(0, 1);
        var browserResult = new BrowserResult()
        {
            ResultType = BrowserResultType.UserCancel
        };

        var signinWindow = new Window()
        {
            Width = 800,
            Height = 600,
            Title = "Sign In",
            WindowStartupLocation = WindowStartupLocation.CenterScreen
        };
        signinWindow.Closing += (se) =>
        {
            semaphoreSlim.Release();
        };

        var webView = new WebView2();
        webView.NavigationStarting += (se) =>
        {
// Check whether we are navigating back to the RedirectUri specified in OidcClientOptions, // that means authentication process is completed
            if (IsBrowserNavigatingToRedirectUri(new Uri(e.Uri)))
            {
                e.Cancel = true;

                browserResult = new BrowserResult()
                {
                    ResultType = BrowserResultType.Success,
                    Response = new Uri(e.Uri).AbsoluteUri
                };

                semaphoreSlim.Release();
                signinWindow.Close();
            }
        };

        signinWindow.Content = webView;
        signinWindow.Show();

        // Explicit initialization
        await webView.EnsureCoreWebView2Async(null);

        // Delete existing Cookies so previous logins won't remembered
        webView.CoreWebView2.CookieManager.DeleteAllCookies();

        // Navigate
        webView.CoreWebView2.Navigate(_options.StartUrl);

        await semaphoreSlim.WaitAsync();

        return browserResult;
    }

    private bool IsBrowserNavigatingToRedirectUri(Uri uri)
    {
        return uri.AbsoluteUri.StartsWith(_options.EndUrl);
    }
}
Note: we need to have WebView2 Runtime installed on client machines for this to work.

And now when I ran the application, it's just beautiful. And works great with IdentityModel.OidcClient.
Responsive UI
Responsive UI
Seems Microsoft Edge WebView2 is the way to go if we are embedding web technologies (HTML, CSS, and JavaScript) in native apps.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Tuesday, February 23, 2021

EF Core: Using HiLo Algorithm to Generate IDs

In this post, let's see how we can use Hi/Lo algorithm as the key generation strategy in EF Core. This can be achieved using UseHiLo method.

As always let's go by an example.

Consider the below code.
public class Category
{
    public int Id { getset; }

    public string Name { getset; }

    public ICollection<Product> Products { getset; }
}

public class Product
{
    public int Id { getset; }

    public string Name { getset; }

    public int CategoryId { getset; }
}
And MyDbContext is setup like this.
public class MyDbContext : DbContext
{
    public DbSet<Category> Categories { getset; }

    public DbSet<Product> Products { getset; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder
            .UseSqlServer("ConnectionString")
            .LogTo(Console.WriteLineLogLevel.Information);
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<Category>(builder =>
        {
            builder
                .HasMany(x => x.Products)
                .WithOne()
                .HasForeignKey(x => x.CategoryId);
        });
    }
}
And now I am adding a Category and a Product.
using var context = new MyDbContext();
await context.Database.EnsureDeletedAsync();
await context.Database.EnsureCreatedAsync();

var category = new Category() { Name = "Some Category" };
context.Categories.Add(category);

var product = new Product { Name =
"Some Product", CategoryId = category.Id };
context.Products.Add(product);

await context.SaveChangesAsync()
The SaveChanges here will fail here with the following error: "The INSERT statement conflicted with the FOREIGN KEY constraint "FK_Products_Categories_CategoryId"". The reason is, we are setting the CategoryId of the Product, but it's still 0 because the Category record is still not saved to the database.

In this kind of a scenario, using HiLo algorithm to determine the Id before the record is actually saved in the database is quite handy.

Enabling HiLo is pretty easy. You can either enable it for all the entities or for a specific entity.
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    // HiLo Id generation For all the entities
    //modelBuilder.UseHiLo();

    modelBuilder.Entity<Category>(builder =>
    {
        builder
            .HasMany(x => x.Products)
            .WithOne()
            .HasForeignKey(x => x.CategoryId);

        // HiLo Id generation only for Category
        builder
            .Property(x => x.Id)
            .UseHiLo("categoryseq");
    });
}
After this change, if you run the example, you are not going to see the previous error, instead, you should see Category now has a valid Id.
Valid CategoryId
If you have a second category added, it will have an Id of value 2.

Here something important to note is if you do UseHilo() for all the entities, the Ids are going to be kind of messed up, if you take the above example, the Id of the Product here will be 2.

Now let's try 2 simultaneous adding. While the control is waiting on SaveChanges, I am starting another instance of the application. 
Another Instance
Here the new category has the Id of value 11.

Now how is this working?

If you add in database migration, you will see EF is creating a Sequence, something like below.
migrationBuilder.CreateSequence(
    name"categoryseq",
    incrementBy: 10);
And once the database is updated, you can see the created sequence here.
Sequences
categoryseq
Now let's imagine so far we have only created the database and no data is inserted.

Now we are creating a new database context and adding a category to context (context.Categories.Add(category)), EF will run a query like this.
SELECT NEXT VALUE FOR [categoryseq]
This will return 1, and the category will be assigned the value of 1 and it has now blocked Ids from 1 to 10. If we add in another category in the same database context, EF won't run the above query again, it will increment the Id by 1 and when we have added 10 categories, EF will run the above query again to get the next value, which will be 11. But imagine while we are adding these 10 categories, some other user creates a new database context and starts adding categories, then that process will also call the above query, so this time he will get 11 (because so far we have only blocked from 1 to 10). And when we call for next value, we will get 21, because that process has now blocked 11 to 20.

Isn't it nice? But there is a kind of downside here. This can cause missing Ids (if you are worried about the order of the Ids), but generally, it shouldn't be an issue.

Hope this helps!

Happy Coding.

Regards,
Jaliya