In this post let's have a look at how to implement Simple Request-Reply Pattern with Azure Service Bus.
So first have a look at what this Simple Request-Reply Pattern is. While the Service Buses are used mainly for asynchronous processing, there can be scenarios where the message Publisher/Sender needs to wait for a reply from Consumer/Receiver to the message they sent before proceeding. And that's where the Simple Request-Reply Pattern comes into the picture.
Let's go by an example.
Say a Sender sends the following message into a particular queue and a Receiver is consuming it. The sender populates the Input and expects the Consumer to populate the Output for it to proceed further.
public record ApplicationMessage(string Input) { public string? Output { get; set; } }
So how do we achieve this?
ServiceBusMessage has this ReplyTo Property where the Sender can use to set the address of an entity to send its replies to, something like below.
Sender
ServiceBusAdministrationClient serviceBusAdministrationClient = new(Configuration.CONNECTION_STRING); // Temporary Queue for Receiver to send their replies into string replyQueueName = Guid.NewGuid().ToString(); await serviceBusAdministrationClient.CreateQueueAsync(new CreateQueueOptions(replyQueueName) { AutoDeleteOnIdle = TimeSpan.FromSeconds(300) }); // Sending the message await using ServiceBusClient serviceBusClient = new(Configuration.CONNECTION_STRING); ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(Configuration.QUEUE_NAME); ApplicationMessage applicationMessage = new("John"); ServiceBusMessage serviceBusMessage = new(JsonSerializer.SerializeToUtf8Bytes(applicationMessage)) { ContentType = "application/json", ReplyTo = replyQueueName, }; await serviceBusSender.SendMessageAsync(serviceBusMessage);
Then after sending the message, Sender needs to look for replies in Queue: replyQueueName.
// Creating a receiver and waiting for the Receiver to reply ServiceBusReceiver serviceBusReceiver = serviceBusClient.CreateReceiver(replyQueueName); ServiceBusReceivedMessage serviceBusReceivedMessage = await serviceBusReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(60)); if (serviceBusReceivedMessage == null) { WriteLine("Error: Didn't receive a response."); return; } applicationMessage = JsonSerializer.Deserialize<ApplicationMessage>(serviceBusReceivedMessage.Body.ToString());
Consumer
From the Consumer, it's easy. It just needs to send the reply to the entity specified in the incoming ServiceBusMessage.ReplyTo.
await using ServiceBusClient serviceBusClient = new(Configuration.CONNECTION_STRING); ServiceBusProcessor serviceBusProcessor = serviceBusClient.CreateProcessor(Configuration.QUEUE_NAME); serviceBusProcessor.ProcessMessageAsync += async args => { // Message received ApplicationMessage applicationMessage = JsonSerializer.Deserialize<ApplicationMessage>(args.Message.Body.ToString()); WriteLine($"Message Received: {applicationMessage}.\n"); // Process the message/Update the Output applicationMessage.Output = $"Hello {applicationMessage.Input}!."; // Sending the reply ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(args.Message.ReplyTo); ServiceBusMessage serviceBusMessage = new(JsonSerializer.Serialize(applicationMessage)); await serviceBusSender.SendMessageAsync(serviceBusMessage); };
Simple Request-Reply Pattern Output |
More Read,
Message Routing and Correlation
Hope this helps.
Happy Coding.
No comments:
Post a Comment