Monday, June 21, 2021

EF Core and MS SQL Server: Adding Migrations for a Table that has System-Versioning Enabled

Note: this post on based on EF Core 5.x.

In this post, let's see how we can add EF database migration for a table that has System-Versioning enabled. (If you don't know what System-Versioning enabled tables are in Microsoft SQL Server, you can read Temporal tables for more information, but if you landed here in this post, it's very likely you already know what that is).

In my case, I wanted to add a computed column to a table that has System Versioning enabled. So if I just try to add a column to the target table, when applying migrations EF will throw an error, something like "System-versioned table schema modification failed because adding computed column while system-versioning is ON is not supported". So here what we need is to first turn off the system-versioning, do the necessary schema changes and finally turn on the system-versioning back again.

So let's go by an example. I am adding a new computed column to my target table PropertyAccount and it's already System-Versioned.
public class PropertyAccountConfiguration : IEntityTypeConfiguration<PropertyAccount>
{
    public void Configure(EntityTypeBuilder<PropertyAccount> builder)
    {
        // some other configuration

        builder
            .Property(x => x.AccountNumber)
            .HasComputedColumnSql("RIGHT('000000'+CAST(Id AS VARCHAR(6)),6)")
            .HasMaxLength(6);
    }
}
So now if I add a migration, EF will generate something like this.
public partial class PT_AddAccountNumberToPropertyAccount : Migration
{
    protected override void Up(MigrationBuilder migrationBuilder)
    {
        migrationBuilder.AddColumn<string>(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccount",
            type: "nvarchar(6)",
            maxLength: 6,
            nullable: true,
            computedColumnSql: "RIGHT('000000'+CAST(Id AS VARCHAR(6)),6)");
    }

    protected override void Down(MigrationBuilder migrationBuilder)
    {
        migrationBuilder.DropColumn(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccount");
    }
}
But if we try to apply the migration, it's going to throw the above error.

So now we need to turn off the system-versioning, add the column to target table and its History table, and then turn on the system-versioning again, something like below.
public partial class PT_AddAccountNumberToPropertyAccount : Migration
{
    private const string PropertyAccountTableName = "pt.PropertyAccount";

    protected override void Up(MigrationBuilder migrationBuilder)
    {
        migrationBuilder.DisableSystemVersioning(PropertyAccountTableName);

        migrationBuilder.AddColumn<string>(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccount",
            type: "nvarchar(6)",
            maxLength: 6,
            nullable: true,
            computedColumnSql: "RIGHT('000000'+CAST(Id AS VARCHAR(6)),6)");

        migrationBuilder.AddColumn<string>(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccountHistory",
            type: "varchar(6)",
            maxLength: 6,
            nullable: true);

        migrationBuilder.EnableSystemVersioning(PropertyAccountTableName);
    }

    protected override void Down(MigrationBuilder migrationBuilder)
    {
        migrationBuilder.DisableSystemVersioning(PropertyAccountTableName);

        migrationBuilder.DropColumn(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccount");

        migrationBuilder.DropColumn(
            name: "AccountNumber",
            schema: "pt",
            table: "PropertyAccountHistory");

        migrationBuilder.EnableSystemVersioning(PropertyAccountTableName);
    }
}
The 2 methods DisableSystemVersioning and EnableSystemVersioning are some two extension methods I have created on MigrationBuilder.
public static class MigrationBuilderExtensions
{
    public static void DisableSystemVersioning(this MigrationBuilder migrationBuilderstring tableName)
    {
        if (string.IsNullOrEmpty(tableName))
        {
            throw new ArgumentNullException(nameof(tableName));
        }

        migrationBuilder.Sql($"ALTER TABLE {tableName} SET (SYSTEM_VERSIONING = OFF);");
    }

    public static void EnableSystemVersioning(this MigrationBuilder migrationBuilderstring tableNamestring historyTableName = null)
    {
        if (string.IsNullOrEmpty(tableName))
        {
            throw new ArgumentNullException(nameof(tableName));
        }

        if (string.IsNullOrEmpty(historyTableName))
        {
            historyTableName = $"{tableName}History";
        }

        migrationBuilder.Sql(@$"ALTER TABLE {tableName} SET
            (
                SYSTEM_VERSIONING = ON
                ( HISTORY_TABLE = 
{historyTableName})
            );
        "
);
    }
}
And this works like a charm. 

Hope this helps.

Happy Coding.

Regards,
Jaliya

Monday, June 14, 2021

.NET 6 Preview 4: Async Streaming in ASP.NET Core

In this post, let's go through another feature that got introduced to ASP.NET Core in .NET 6 Preview 4. That is Async Streaming.

Before looking at how Async Streaming works in .NET 6, let's first have a look at how it works in the current version: .NET 5. I have created a simple API targetting .NET 5 that returns an IAsyncEnumerable<T>.
[ApiController]
[Route("[controller]")]
public class ValuesController : ControllerBase
{
    [HttpGet]
    public IAsyncEnumerable<intGet()
    {
        IAsyncEnumerable<intvalue = GetData();
        return value;
    }

    private static async IAsyncEnumerable<intGetData()
    {
        for (var i = 1; i <= 10; i++)
        {
            await Task.Delay(1000);
            yield return i;
        }
    }
}
Here if you run this, first the results would be buffered into memory before being written into the response. So you will get the response after like ~10 seconds. Something like this.
IAsyncEnumerable<T> streaming .NET 5
Now let's change the target framework to .NET 6 and run the same code. 
IAsyncEnumerable<T> streaming .NET 6
Isn't it nice? So what's happening here is, IAsyncEnumerable<T> instances are no longer buffered into the memory before it gets written out into the response. But there is something very important to note here, this functionality will only work if you are using System.Text.Json as the serializer, because this functionality is actually made possible by System.Text.Json as it now has support for streaming IAsyncEnumerable<T> types. If you are using NewtonsoftJson as your serializer, things will not work as shown above.


Happy Coding.

Regards,
Jaliya

Monday, June 7, 2021

.NET 6 Preview 4: Introducing Minimal APIs in ASP.NET Core

In this post, let's see one of the nicest features coming in with ASP.NET Core in .NET 6. This feature is called Minimal APIs, the basic idea is being able to create a REST API with minimal code. 

Right now, if you create an ASP.NET Core Web API project using Visual Studio or dotnet CLI, you will see something like this, a project with multiple files.
Not Minimal API
Basically, you have a Program.cs, Startup.cs and Controller class. The motivation    behind the ASP.NET team for introducing Minimal API is, we don't have to do such ceremony to write smaller APIs or a small microservice.

If you have installed .NET 6 Preview 4 and do dotnet new web, you should see something new.

dotnet new web -n MinimalApi
Minimal API
There is only going to be just one .cs file which is Program.cs.

Program.cs

using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Hosting;

var builder = WebApplication.CreateBuilder(args);
await using var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseDeveloperExceptionPage();
}

app.MapGet("/", (Func<string>)(() => "Hello World!"));

await app.RunAsync();

This is just a simple API, which will return just "Hello World". There is no Main method, it's using Top-level statements (a C# 9.0 feature). 

Here you can basically do all the things that you used to do like set up dependencies and configure the HTTP Request Pipeline (what we usually do in ConfigureServices and Configure methods in Startup.cs respectively).

To give an example, something like this.

WebApplicationBuilder builder = WebApplication.CreateBuilder(args);

// ConfigureServices
builder.Services.AddDbContext<EmployeeContext>(options =>
    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));

await using WebApplication app = builder.Build();

// Configure
if (app.Environment.IsDevelopment())
{
    app.UseDeveloperExceptionPage();
} // Setup routes and run the app

Here, I have set up a EF DbContext within the container and in the HTTP Request Pipeline, have set up a DeveloperExceptionPage if the environment is Development (again we used to do this in ConfigureServices and Configure methods in Startup.cs respectively).

Consider the below EmployeeContext.

namespace MinimalApi
{
    public class EmployeeContext : DbContext
    {
        public EmployeeContext(DbContextOptions options) : base(options)
        {
        }

        public DbSet<Employee> Employees { getset; }
    }

    public class Employee
    {
        public int Id { getset; }

        public string Name { getset; }
    }
}

I can basically create a CRUD API for Employees here easily, something like below.

WebApplicationBuilder builder = WebApplication.CreateBuilder(args);

// ConfigureServices
builder.Services.AddDbContext<EmployeeContext>(options =>
    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));

await using WebApplication app = builder.Build();

// Configure
if (app.Environment.IsDevelopment())
{
    app.UseDeveloperExceptionPage();
}

app.MapGet("/employees"async ([FromServices] EmployeeContext dbContext) =>
{
    return await dbContext.Employees.ToListAsync();
});

app.MapGet("/employees/{id}"async ([FromServices] EmployeeContext dbContextint id) =>
{
    Employee employee = await dbContext.Employees.FindAsync(id);
    if (employee is null)
    {
        return NotFound();
    }

    return Ok(employee);
});

app.MapPost("/employees"async ([FromServices] EmployeeContext dbContext, Employee employee) =>
{
    await dbContext.Employees.AddAsync(employee);
    await dbContext.SaveChangesAsync();

    return Ok(employee);
});

app.MapPut("/employees/{id}"async ([FromServices] EmployeeContext dbContextint id, Employee employee) =>
{
    if (id != employee.Id)
    {
        return BadRequest();
    }

    if (!await dbContext.Employees.AnyAsync(x => x.Id == id))
    {
        return NotFound();
    }

    dbContext.Entry(employee).State = EntityState.Modified;
    await dbContext.SaveChangesAsync();

    return NoContent();
});

app.MapDelete("/employees/{id}"async ([FromServices] EmployeeContext dbContextint id) =>
{
    Employee employee = await dbContext.Employees.FindAsync(id);
    if (employee is null)
    {
        return NotFound();
    }

    dbContext.Employees.Remove(employee);
    await dbContext.SaveChangesAsync();

    return NoContent();
});

await app.RunAsync();

There is an interesting thing here. That is from the endpoints, all my return types are custom types that implement IResult, a new type that is coming with .NET 6.


My Return types are mapped manually here, through this class ResultMapper class.

public static class ResultMapper
{
    public static IResult BadRequest() => new StatusCodeResult(StatusCodes.Status400BadRequest);

    public static IResult NotFound() => new StatusCodeResult(StatusCodes.Status404NotFound);

    public static IResult NoContent() => new StatusCodeResult(StatusCodes.Status204NoContent);

    public static OkResult<T> Ok<T>(T value) => new(value);

    public class OkResult<T> : IResult
    {
        private readonly T _value;

        public OkResult(T value)
        {
            _value = value;
        }

        public Task ExecuteAsync(HttpContext httpContext)
        {
            return httpContext.Response.WriteAsJsonAsync(_value);
        }
    }
}

Hopefully, we will not have to do this in the next releases.

So as developers or newbies, we can start from the Minimal APIs and we can grow as we go rather than having a somewhat complex structure from the first day. Microsoft is going to provide tooling (I believe through Visual Studio and VS Code), so we can refactor the code into the current structure (separate Controller etc) as the code grows. An important thing is, this is not going to replace the existing code structure, it's just we can start with as little code as possible and then grow as we go!

You can find the sample code here,
   https://github.com/jaliyaudagedara/minimal-api

In this sample code, you will see I am using global usings (a C# 10 feature), you can read more about that here: C# 10.0: Introducing Global Usings

Hope this helps.

Happy Coding.

Regards,
Jaliya

Tuesday, June 1, 2021

Azure Durable Functions: Change the Storage Provider from Azure Storage to Microsoft SQL Server

During the Microsoft Build 2021 last week, there was an important announcement related to Azure Durable Functions. For all this time Azure Durable Functions was maintaining it's state in Azure Storage and we didn't have any control of that. These were mainly blobs, queues, tables.

But now we have 2 other storage options. But note: as of the date I am writing this post, these are available as Preview functionality.
  1. Netherite
  2. Microsoft SQL Server
    • This can be both on-premise and cloud-hosted deployments of SQL Server, including Azure SQL Database.
In this post, let's see how we can use Microsoft SQL Server as the state storage provider for an Azure Durable Function App. I am going to be using .NET Function App using C#.

I have created an Azure Function App using Visual Studio choosing Durable Functions Orchestrator as the template (Note: Durable Functions are not yet supported with .NET 5, we still need to target .NET Core 3 (LTS) which is .NET Core 3.1).

Then we need to install Microsoft.DurableTask.SqlServer.AzureFunctions NuGet package (Note: as of the date I am writing this post, it's still in a pre-release version). Once that's installed, I am updating the host.json as follows.

host.json
{
  "version""2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled"true,
        "excludedTypes""Request"
      }
    },
    "logLevel": {
      "DurableTask.SqlServer""Information",
      "DurableTask.Core""Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "storageProvider": {
        "type""mssql",
        "connectionStringName""SQLDB_Connection"
      }
    }
  }
Here the most important change is in the extensions node. There we have customized the storageProvider to be mssql and configured a connectionStringName. And also there is a slight modification to logging configuration to reduce unnecessary noise, so we can see what's happening on SQL side of things.

Now we need to specify the SQLDB_Connection key and it's value. Since I am going to be running this function locally, I am going to add that key into local.settings.json. When deployed to Azure, you can maintain this setting in Application Settings or where ever you maintain your AppSettings.

Before that, let's create an empty database in our Microsoft SQL Server. I am just using my local MSSQL Server.
Empty Database
It's just an empty database, I just expanded Tables and Stored Procedures, just to be sure. Now I am copying the connection string for this database and updating local.settings.json as follows.

local.settings.json
{
  "IsEncrypted"false,
  "Values": {
    "AzureWebJobsStorage""UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME""dotnet",
    "SQLDB_Connection""Data Source=RAVANA-TPP15\\MSSQLSERVER2019;Initial Catalog=AzDurableFuncApp;Integrated Security=True;"
  }
}
Here we are still going to need AzureWebJobsStorage key and value, because there are other things (secretc etc. ) that needs to be stored.

Now let's run the function app.
First run on the function app
You will see that some SQL scripts are getting executed. Now let's refresh the database.
Updated Database
There are couple of tables and a bunch of Stored Procudures (amongst other things) that got added in order to manage the state. If you want to have a look at the SQL scripts that is getting executed, you can find those here: https://github.com/microsoft/durabletask-mssql/tree/main/src/DurableTask.SqlServer/Scripts.

So why we do need to consider about changing Storage Provider for Azure Durable Functions in the first place and which one should we use. You can find answers for those and find more information by reading through these.

Hope this helps.

Happy Coding.

Regards,
Jaliya

Thursday, May 27, 2021

C# 10.0: Introducing Global Usings

C# 10.0 is expected to ship with .NET 6 in November, 2021 during .NET Conf 2021 (November 9-11), but some of the features are already available with C# Preview LangVersion. One of such features is Global Usings. In this post, let's have a look at what that is.

I have created a Console Application and have set up the LangVersion to preview.

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net6.0</TargetFramework>
    <LangVersion>preview</LangVersion>
  </PropertyGroup>

</Project>

Now I have the following code in Program.cs file.

Course course = await GetSomeCourse();

foreach (
Student student in course.Students)
{
    
Console.WriteLine(student.Name);
}

static async 
Task<Course> GetSomeCourse()
{
    
Course course = new("C# 10"new List<Student> { new("John"), new("Jane") });

    return await 
Task.FromResult(course);
}

public record Student(string Name);

public record Course(string Name
List<Student> Students);

So here you can see I have used some C# 9 features (Top Level statements, Records,  Target-Typed new expressions etc). But here, I don't have any usings. Of course, I need to be having using SystemSystem.Collections.Generic and System.Threading.Tasks here, but where are those?

In my project, I have added another file called Usings.cs (you can name it with any name you want to).
Usings.cs
And there in the Usings.cs, I have the following.

global using System;
global using System.Collections.Generic;
global using System.Threading.Tasks;

Note the global keyword. So this means, these usings are available throughout my project. 99% of the time, there will be some usings which needs to be there in every .cs file you have, so basically, you can move all those into a different file and use it with global keyword.

Now let's say, I am adding a new class to the project.
Using appeared previously in this namespace
Compiler immediately identifies we have global usings and we don't have to repeat them here.

Isn't it nice?

Happy Coding.

Regards,
Jaliya

Wednesday, May 26, 2021

Introducing .NET Hot Reload in Visual Studio 2019 Version 16.11.0 Preview 1.0

A couple of weeks back I wrote a post about Early Support for .NET Hot Reload is Now Available with .NET 6 Preview 3, and there I went through one of the greatest features expected to go to RTM with .NET 6, which is .NET Hot Reload. There at the time of me writing the post, this feature was only available with dotnet command, and not within Visual Studio.

If you have installed the latest Preview of Visual Studio that was announced earlier today during the Microsoft Build 2021, which is Visual Studio 2019 Version 16.11.0 Preview 1.0, you might have noticed something new when you are on a debugging session in Visual Studio.
Apply Code Changes
So with Visual Studio 2019 Version 16.11.0 Preview 1.0, we now have .NET Hot Reload experience through Visual Studio (actually this was initially available with Visual Studio 2019 Version 16.10.0 Preview 2.0).

So how this is going to work is something like below. I am trying the same project that I used in my previous post, this time I am debugging through Visual Studio.
.NET Hot Reload in Visual Studio 2019 Version 16.11.0 Preview 1.0
We just need to do the code change while Debugging and then hit Apply Code Changes and that's it.

This is still in it's early stages, it's going to get improved and going to support more project types with upcoming releases. Currently, for this to work a debugging session is required, but with Visual Studio 2022, we should be able to use .NET Hot Reload without needing the debugger, this means when we do CTRL+F5 and do code changes, .NET Hot Reload feature should be patching the running application.

For more details on this feature, please go through this post:

Great things to look forward to!

Happy Coding.

Regards,
Jaliya

Monday, May 24, 2021

Azure DevOps: Granting Permission to a Pipeline to Consume an Azure Artifacts Project Scoped Feed in a Different Project

I have recently faced this npm ERR! 404 Not Found when trying to do an npm install. In this scenario, I was trying to access an Azure Artifacts Project Scoped Feed from a Pipeline that is running in a Different Project.

After some time of struggling, figured out it was a permission issue. What we need to do is, following two things.

  1. First, in the Project where the Project Scoped Feed lives in, we need to go to Project Settings -> Permissions. And then add the Pipelines Build Service to Contributors Group. Pipelines Build Service is something like "{ProjectName} Build Service ({OrganizationName})"
    Project Settings -> Permissions -> Contributors
  2. Then in Project Scoped Feed Settings, under Permissions, we need to grant Pipelines Build Service at least Collaborator access, so packages can be ingested from whatever the upstream sources the feed is setup with. If you only give read permissions, packages cannot be ingested from upstream sources.
    Feed Permission

And this should be it.

Hope that helps.

Happy Coding.

Regards,
Jaliya

Sunday, May 16, 2021

Polly: Executing an Action with Retries

In this post, let's have a look at how we can execute any Actions with Retries using Polly. It's actually quite easy.

Rather than explaining in words, a code sample would greatly explain itself.

Say I have this custom exception, so whenever I received this exception, let's say I want to add some retry logic.

public class MyCustomException : Exception
{
    public MyCustomException(string message) : base(message)
    {
    }
}

So I can create a Retry Policy and just execute whatever our action within the policy. (here I am just using the async counterpart since that what we are using almost all the time). 

using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using System;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static async Task Main(string[] args)
        {
            using ILoggerFactory loggerFactory = LoggerFactory.Create(builder =>
            {
                builder.AddConsole();
            });

            ILogger<Program> logger = loggerFactory.CreateLogger<Program>();

            AsyncRetryPolicy policy = CreatePolicyFor<MyCustomException>(logger);

            await policy.ExecuteAsync(async () =>
            {
                await DoSomething();
            });
        }

        private static async Task DoSomething()
        {
            throw new MyCustomException("Some Exception");
        }

        private static AsyncRetryPolicy CreatePolicyFor<TException>(ILogger loggerint retries = 3, int delayInSecods = 5)  where TException : Exception
        {
            return Policy
                .Handle<TException>()
                .WaitAndRetryAsync(
                    retryCount: retries,
                    sleepDurationProvider: retry => TimeSpan.FromSeconds(delayInSecods),
                    onRetry: (exceptiontimeSpanretryctx) =>
                    {
                        logger.LogWarning(exception,
                            "Exception {ExceptionType} with message {Message} detected on attempt {retry} of {retries}",
                            exception.GetType().Name,
                            exception.Message,
                            retry,
                            retries);
                    }
                );
        }
    }
}

We can specify how many retries we want, the sleep duration between retries, and nicely, we can even specify an Action to be executed when retrying (like I am logging the attempt info here).

Hope this helps!

Happy Coding.

Regards,
Jaliya

Tuesday, May 11, 2021

EF Core 5.0: How to use SavePoints

In this post, let's have a look at SavePoints that was introduced with EF Core 5.0.

Savepoints are basically points within a database transaction that may later be rolled back to if an error occurs or for any other reason. Let's go by an example.

Consider the below simple MyDbContext.
public class MyDbContext : DbContext
{
    public DbSet<Category> Categories { getset; }

    public DbSet<Product> Products { getset; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder
            .UseSqlServer(@"Data Source=.;Initial Catalog=EfCore5;Integrated Security=True")
            .EnableSensitiveDataLogging()
            .LogTo(Console.WriteLine, LogLevel.Information);
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<Category>(builder =>
        {
            builder
                .HasMany(x => x.Products)
                .WithOne()
                .HasForeignKey(x => x.CategoryId);
        });
    }
}

public class Category
{
    public int Id { getset; }

    public string Name { getset; }

    public ICollection<Product> Products { getset; }
}

public class Product
{
    public int Id { getset; }

    public string Name { getset; }

    public int CategoryId { getset; }
}
And I am inserting some data as below.
using var context = new MyDbContext();
await context.Database.EnsureDeletedAsync();
await context.Database.EnsureCreatedAsync();

Category category = null;
Product product = null;

using IDbContextTransaction transaction = context.Database.BeginTransaction();
try
{
    category = new Category() { Name = "Some Category" };
    context.Categories.Add(category);
    await context.SaveChangesAsync();

    await transaction.CreateSavepointAsync("CategoryCreated");

    // Setting incorrect CategoryId FK, this will throw FK constraint exception
    product = new Product { Name = "Some Product", CategoryId = 999 };
    context.Products.Add(product);
    await context.SaveChangesAsync();

    await transaction.CommitAsync();

}
catch (Exception)
{
    await transaction.RollbackToSavepointAsync("CategoryCreated");

    // Remove the invalid existing product
    context.Products.Local.Remove(product);
    product = new Product { Name = "Some Product", CategoryId = category.Id };
    context.Products.Add(product);

    //// Update/fix the invalid existing product
    //context.Products.Local.First(x => x.Name == product.Name).CategoryId = category.Id;

    await context.SaveChangesAsync();

    await transaction.CommitAsync();
}
First I am creating a Category and then I am inserting a Product. The Product save will throw an exception because I am setting an invalid CategoryId. Then inside the catch block, I am rolling back the transaction to the SavePoint I created when the Category is created ("CategoryCreated") and retrying to insert the Product. An important thing to note here is only the transaction is getting rolled back, whatever I have added to the DbContext, is staying as it is. So we MUST either remove the invalid existing product and then re-add or update/fix the invalid existing product.

Special thanks to Dr. Holger Schwichtenberg for helping me to understand how this works when I raised an issue: https://github.com/dotnet/efcore/issues/24862.
Hope this helps.

Happy Coding.

Regards,
Jaliya

Thursday, April 29, 2021

Azure Service Bus Client Library for .NET: Receivers vs. Processors

Azure Service Bus Client Library for .NET mainly provides two approaches for consumers to consume messages from a Queue/Subscription, those are ReceiversServiceBusReceiver and ProcessorsServiceBusProcessor (when consuming messages from Sessions enabled entity ServiceBusSessionReceiver and ServiceBusSessionProcessor respectively).

In this post, let's see what these are and when to use Receivers or Processors. Here I am going to be using ServiceBusSessionProcessor as I am reading from Session enabled Queue.

First, let's have a look at how Receivers are used. In one of my previous posts, I wrote about Azure Service Bus: Handling FIFO using Sessions (please take a read, you might find it interesting) where I was using the Receivers approach. For the sake of simplicity, I will just share the same code.
ServiceBusSessionReceiver
You can see there are two do-while loops and a lot of ceremonies going on. We can use ServiceBusSessionProcessor to simplify this code and it offers additional features like automatic completion of processed messages, automatic message lock renewal, and concurrent execution etc.

Now let's have a look at how we can use ServiceBusSessionProcessor.
private static async Task RunWithSessionProcessor(ServiceBusClient client)
{
    var options = new ServiceBusSessionProcessorOptions()
    {
        // interesting options here
    };

    ServiceBusSessionProcessor processor = client.CreateSessionProcessor(Shared.Configuration.QUEUE_NAME, options);

    processor.SessionInitializingAsync += async args =>
    {
        Console.WriteLine($"Initializing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}', SessionLockedUntil: '{args.SessionLockedUntil}'");
    };

    processor.SessionClosingAsync += async args =>
    {
        Console.WriteLine($"Closing Session: '{args.SessionId}' at '{DateTimeOffset.UtcNow}'\n");
    };

    processor.ProcessMessageAsync += async args =>
    {
        Console.WriteLine($"Received for Session: '{args.SessionId}', Message: '{args.Message.Body}', Ack: Complete");
    };

    processor.ProcessErrorAsync += async args =>
    {
        Console.WriteLine($"Exception for Session: '{args.Exception.Message}'");
    };

    Console.WriteLine("Starting...Press any character to gracefully exit.");

    await processor.StartProcessingAsync();

    Console.ReadKey();

    Console.WriteLine("Stopping...");

    await processor.StopProcessingAsync();

    Console.WriteLine("Stopped!");
}
First, we are creating a ServiceBusSessionProcessor for the targetted queue (it has an overload if you are using Subscriptions). You can see it's mostly event-based and we have different events such as when initializing a session, receiving a message, etc. We register the events and then we just have to start the processing. You must have noted that I have created ServiceBusSessionProcessorOptions to be passed into the CreateSessionProcessor method. We can use these options to control a variety of things, most importantly the following,
  • AutoCompleteMessages
    • When this is true, we don't explicitly need to Acknowledge the broker once the message is completed. Note: if there is an error in processing the message, the message won't be acknowledged as completed even though this is set to true. 
    • Default is true.
  • MaxAutoLockRenewalDuration
    • Maximum duration within which the session lock will be renewed automatically, the more is better. 
    • Default is 5 minutes.
  • MaxConcurrentCallsPerSession
    • Gets or sets the maximum number of concurrent calls to the message handler the processor should initiate per session. 
    • Default is 1.
  • MaxConcurrentSessions
    • Specifies how many sessions can be processed concurrently by the processor. 
    • Default is 8.
  • PrefetchCount
    • This specifies how many messages can be eagerly requested from the entity rather than doing a service request to get messages one by one. Note: this must be used with care, because the moment the broker released the message, that messages' Message Lock Duration starts.
    • Default is 0.
The general recommendation is Processors should be the go-to tool for writing applications that receive messages from Service Bus entities. The Receivers are recommended for more complex scenarios in which the processor is not able to provide the fine-grained control that one can expect when using the Receiver directly. Azure Functions use Processors, so obviously, that's my go-to option.

You can find the sample code through the below link and play around.

Hope this helps.

Happy Coding.

Regards,
Jaliya