End-to-end Order Placed Communication using RabbitMQ

End-to-end Order Placed Communication using RabbitMQ

In this post, I will explain how to Implement End-to-end Order Placed Communication using RabbitMQ. Please read our previous article, which discusses how to integrate RabbitMQ in ASP.NET Core Web API.

Now that we have set up Messaging.Common (our shared RabbitMQ integration library), the next step is to wire it into our E-Commerce Microservices (Order, Payment, Inventory, Notification). Let us proceed and do this step by step:

  • OrderService → Publishes OrderPlacedEvent
  • ProductService → Consumes OrderPlacedEvent and reduces stock
  • NotificationService → Consumes OrderPlacedEvent and sends notification
Add Reference to Messaging.Common Project

In each Microservice Infrastructure layer project (OrderService, PaymentService, ProductService, NotificationService):

  • Right-click Dependencies → Add Reference → Projects → select Messaging.Common.

This lets each microservice reuse Publisher, BaseConsumer, ConnectionManager, and EventBase.

Create Service.Contracts Projects (Per Microservice)

For each service that needs RabbitMQ (Order, Payment, Product, Notification): Add a new Class Library project named:

  • OrderService.Contracts (already exists)
  • PaymentService.Contracts
  • ProductService.Contracts
  • NotificationService.Contracts
OrderService (Publisher)

Order Service: Publish the event when the order status becomes “Confirmed”.

Order Microservice Contract Layer: Publisher Interface

First, create a folder named Messaging at the root level of OrderService.Contracts project. Then, create a class file named IOrderEventPublisher.cs within the Messaging folder, and copy-paste the following code.

using Messaging.Common.Events;
namespace OrderService.Contracts.Messaging
{
    public interface IOrderEventPublisher
    {
        Task PublishOrderPlacedAsync(OrderPlacedEvent evt, string? correlationId = null);
    }
}
Order Microservice Infrastructure Layer: Publisher Implementation

First, create a folder named Messaging at the root level of OrderService.Infrastructure project. Then, create a class file named RabbitMqOrderEventPublisher.cs within the Messaging folder, and copy-paste the following code. This class implements the IOrderEventPublisher contract. It is responsible for publishing “OrderPlacedEvent” messages to RabbitMQ.

using System.Text.Json;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Messaging.Common.Events;
using Messaging.Common.Options;
using Messaging.Common.Topology;
using OrderService.Contracts.Messaging;

namespace OrderService.Infrastructure.Messaging
{
    public sealed class RabbitMqOrderEventPublisher : IOrderEventPublisher
    {
        private readonly IModel _channel;        // RabbitMQ channel object, used to publish messages.
        private readonly RabbitMqOptions _opt;   // Holds RabbitMQ configuration settings.

        // Constructor: dependencies (channel + options) are injected via DI.
        public RabbitMqOrderEventPublisher(IModel channel, IOptions<RabbitMqOptions> opt)
        {
            _channel = channel;                  // Assign the RabbitMQ channel (already created and registered).
            _opt = opt.Value;                    // Extract RabbitMQ configuration values (host, exchange, queues, etc.).

            // Ensure the exchange, queues, and bindings exist before publishing.
            // This avoids publishing to a non-existent exchange/queue.
            RabbitTopology.EnsureAll(_channel, _opt);
        }

        // Publishes an OrderPlacedEvent message to RabbitMQ.
        // correlationId helps trace the message across multiple microservices.
        public Task PublishOrderPlacedAsync(OrderPlacedEvent evt, string? correlationId = null)
        {
            // If no correlationId was provided, use the one from the event.
            evt.CorrelationId = correlationId ?? evt.CorrelationId;

            // Serialize the event object into JSON UTF-8 bytes (efficient for transport).
            var body = JsonSerializer.SerializeToUtf8Bytes(evt);

            // Create RabbitMQ message properties (metadata for the message).
            var props = _channel.CreateBasicProperties();
            props.Persistent = true;             // Ensures the message is persisted to disk (survives broker restart).
            props.CorrelationId = evt.CorrelationId;  // Set correlationId for traceability.

            // Publish the message to RabbitMQ:
            // - Exchange: taken from configuration (e.g., "ecommerce.topic").
            // - RoutingKey: "order.placed" ensures it is delivered to bound queues (product & notification).
            // - Mandatory = true: if no queue is bound to the routing key, the message is returned or dead-lettered.
            // - Properties: message metadata (persistence, correlationId).
            // - Body: the serialized OrderPlacedEvent payload.
            _channel.BasicPublish(_opt.ExchangeName, "order.placed", true, props, body);

            // Return a completed task (since this is a fire-and-forget publish).
            return Task.CompletedTask;
        }
    }
}
Order Microservice Application Layer:

Now, from the OrderService class of OrderAppService.Application layer, we need to call publisher after successful order placement.

Inject IOrderEventPublisher:

So, please inject IOrderEventPublisher as follows into the OrderService class. Using this instance, we will publish the Order Placed event when the order is confirmed.

public class OrderService : IOrderService
{
    private readonly IOrderRepository _orderRepository;
    private readonly IUserServiceClient _userServiceClient;
    private readonly IProductServiceClient _productServiceClient;
    private readonly IPaymentServiceClient _paymentServiceClient;
    private readonly INotificationServiceClient _notificationServiceClient;
    private readonly IMapper _mapper;
    private readonly IMasterDataRepository _masterDataRepository;
    private readonly IConfiguration _configuration;
    private readonly IOrderEventPublisher _publisher;

    public OrderService(
        IOrderRepository orderRepository,
        IUserServiceClient userServiceClient,
        IProductServiceClient productServiceClient,
        IPaymentServiceClient paymentServiceClient,
        INotificationServiceClient notificationServiceClient,
        IMasterDataRepository masterDataRepository,
        IMapper mapper,
        IConfiguration configuration,
        IOrderEventPublisher publisher)
    {
        // Initialize dependencies with null checks for safe injection
        _orderRepository = orderRepository ?? throw new ArgumentNullException(nameof(orderRepository));
        _userServiceClient = userServiceClient ?? throw new ArgumentNullException(nameof(userServiceClient));
        _productServiceClient = productServiceClient ?? throw new ArgumentNullException(nameof(productServiceClient));
        _paymentServiceClient = paymentServiceClient ?? throw new ArgumentNullException(nameof(paymentServiceClient));
        _notificationServiceClient = notificationServiceClient ?? throw new ArgumentNullException(nameof(notificationServiceClient));
        _masterDataRepository = masterDataRepository ?? throw new ArgumentNullException(nameof(masterDataRepository));
        _mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
        _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
        _publisher = publisher;
    }

    //Existing Code
}
Modifying CreateOrderAsync Methods:

Please modify the CreateOrderAsync method as follows:

public async Task<OrderResponseDTO> CreateOrderAsync(CreateOrderRequestDTO request, string accessToken)
{
if (request == null)
throw new ArgumentNullException(nameof(request));
if (request.Items == null || !request.Items.Any())
throw new ArgumentException("Order must have at least one item.");
// Validate that the user exists via User Microservice
var user = await _userServiceClient.GetUserByIdAsync(request.UserId, accessToken);
if (user == null)
throw new InvalidOperationException("User does not exist.");
// Resolve Shipping Address ID, either provided or created newly via User Microservice
Guid? shippingAddressId = null;
if (request.ShippingAddressId != null)
{
shippingAddressId = request.ShippingAddressId;
}
else if (request.ShippingAddress != null)
{
request.ShippingAddress.UserId = request.UserId;
shippingAddressId = await _userServiceClient.SaveOrUpdateAddressAsync(request.ShippingAddress, accessToken);
}
// Resolve Billing Address ID, either provided or created newly
Guid? billingAddressId = null;
if (request.BillingAddressId != null)
{
billingAddressId = request.BillingAddressId;
}
else if (request.BillingAddress != null)
{
request.BillingAddress.UserId = request.UserId;
billingAddressId = await _userServiceClient.SaveOrUpdateAddressAsync(request.BillingAddress, accessToken);
}
// Validate presence of both addresses
if (shippingAddressId == null || billingAddressId == null)
throw new ArgumentException("Both ShippingAddressId and BillingAddressId must be provided or created.");
// Validate product stock availability but do not reduce stock yet
var stockCheckRequests = request.Items.Select(i => new ProductStockVerificationRequestDTO
{
ProductId = i.ProductId,
Quantity = i.Quantity
}).ToList();
var stockValidation = await _productServiceClient.CheckProductsAvailabilityAsync(stockCheckRequests, accessToken);
if (stockValidation == null || stockValidation.Any(x => !x.IsValidProduct || !x.IsQuantityAvailable))
throw new InvalidOperationException("One or more products are invalid or out of stock.");
// Retrieve latest product info for accurate pricing and discount
var productIds = request.Items.Select(i => i.ProductId).ToList();
var products = await _productServiceClient.GetProductsByIdsAsync(productIds, accessToken);
if (products == null || products.Count != productIds.Count)
throw new InvalidOperationException("Failed to retrieve product details for all items.");
try
{
// Fetch policies (example placeholders, adjust with your actual logic)
int? cancellationPolicyId = null;
int? returnPolicyId = null;
// Example: fetch cancellation policy based on user or other criteria
var cancellationPolicy = await _masterDataRepository.GetActiveCancellationPolicyAsync();
if (cancellationPolicy != null)
cancellationPolicyId = cancellationPolicy.Id;
var returnPolicy = await _masterDataRepository.GetActiveReturnPolicyAsync();
if (returnPolicy != null)
returnPolicyId = returnPolicy.Id;
var orderId = Guid.NewGuid();
var orderNumber = GenerateOrderNumberFromGuid(orderId);
var now = DateTime.UtcNow;
var initialStatus = request.PaymentMethod == PaymentMethodEnum.COD
? OrderStatusEnum.Confirmed   // COD orders confirmed immediately
: OrderStatusEnum.Pending;    // Online payment orders start as pending
// Create order entity
var order = new Order
{
Id = orderId,
OrderNumber = orderNumber,
UserId = request.UserId,
ShippingAddressId = shippingAddressId.Value,
BillingAddressId = billingAddressId.Value,
PaymentMethod = request.PaymentMethod.ToString(),
OrderStatusId = (int)initialStatus,
CreatedAt = now,
OrderDate = now,
CancellationPolicyId = cancellationPolicyId,
ReturnPolicyId = returnPolicyId,
OrderItems = new List<OrderItem>()
};
// Add order items with fresh product data
foreach (var item in request.Items)
{
var product = products.First(p => p.Id == item.ProductId);
order.OrderItems.Add(new OrderItem
{
Id = Guid.NewGuid(),
OrderId = order.Id,
ProductId = product.Id,
ProductName = product.Name,
PriceAtPurchase = product.Price,
DiscountedPrice = product.DiscountedPrice,
Quantity = item.Quantity,
ItemStatusId = (int)initialStatus
});
}
// Calculate order totals: subtotal, discount, tax, shipping, and final amount
order.SubTotalAmount = Math.Round(order.OrderItems.Sum(i => i.PriceAtPurchase * i.Quantity), 2, MidpointRounding.AwayFromZero);
order.DiscountAmount = Math.Round(await CalculateDiscountAmountAsync(order.OrderItems), 2, MidpointRounding.AwayFromZero);
order.TaxAmount = Math.Round(await CalculateTaxAmountAsync(order.SubTotalAmount - order.DiscountAmount), 2, MidpointRounding.AwayFromZero);
order.ShippingCharges = Math.Round(CalculateShippingCharges(order.SubTotalAmount - order.DiscountAmount), 2, MidpointRounding.AwayFromZero);
order.TotalAmount = Math.Round(order.SubTotalAmount - order.DiscountAmount + order.TaxAmount + order.ShippingCharges, 2, MidpointRounding.AwayFromZero);
// Save order to repository
var addedOrder = await _orderRepository.AddAsync(order);
if (addedOrder == null)
throw new InvalidOperationException("Failed to create order.");
// Initiate payment via Payment Service
var paymentRequest = new CreatePaymentRequestDTO
{
OrderId = order.Id,
UserId = order.UserId,
Amount = order.TotalAmount,
PaymentMethod = request.PaymentMethod
};
var paymentResponse = await _paymentServiceClient.InitiatePaymentAsync(paymentRequest, accessToken);
if (paymentResponse == null)
throw new InvalidOperationException("Payment initiation failed.");
// For COD, immediately reserve stock and send notification
if (request.PaymentMethod == PaymentMethodEnum.COD)
{
#region Event Publishing to RabbitMQ
// Construct the integration event (OrderPlacedEvent)
var orderPlacedEvent = new OrderPlacedEvent
{
OrderId = order.Id,
OrderNumber = order.OrderNumber,
UserId = order.UserId,
CustomerName = user.FullName,
CustomerEmail = user.Email,
PhoneNumber = user.PhoneNumber,
TotalAmount = order.TotalAmount,
Items = order.OrderItems.Select(i => new OrderItemLine
{
ProductId = i.ProductId,
Quantity = i.Quantity,
UnitPrice = i.PriceAtPurchase
}).ToList()
};
// Publish the event to RabbitMQ using the shared publisher.
// This message will be routed to:
//    - ProductService (to decrease stock)
//    - NotificationService (to insert notification record).
// correlationId is set for traceability across logs and microservices.
await _publisher.PublishOrderPlacedAsync(orderPlacedEvent, Guid.NewGuid().ToString());
#endregion
// Map and return order DTO with confirmed status and no payment URL
var orderDto = _mapper.Map<OrderResponseDTO>(order);
orderDto.OrderStatus = OrderStatusEnum.Confirmed;
orderDto.PaymentMethod = PaymentMethodEnum.COD;
orderDto.PaymentUrl = null;
return orderDto;
}
else
{
// Map and return order DTO with pending status and payment URL
var orderDto = _mapper.Map<OrderResponseDTO>(order);
orderDto.OrderStatus = OrderStatusEnum.Pending;
orderDto.PaymentMethod = request.PaymentMethod;
orderDto.PaymentUrl = paymentResponse.PaymentUrl;
return orderDto;
}
}
catch (Exception ex)
{
//Log the Exception
Console.WriteLine(ex.Message);
throw;
}
}
Modifying ConfirmOrderAsync Methods:

Please modify the ConfirmOrderAsync method as follows:

public async Task<bool> ConfirmOrderAsync(Guid orderId, string accessToken)
{
// Retrieve order
var order = await _orderRepository.GetByIdAsync(orderId);
if (order == null)
throw new KeyNotFoundException("Order not found.");
// Only allow confirmation if order is pending
if (order.OrderStatusId != (int)OrderStatusEnum.Pending)
throw new InvalidOperationException("Order is not in a pending state.");
// Retrieve payment info from Payment Service
var paymentInfo = await _paymentServiceClient.GetPaymentInfoAsync(
new PaymentInfoRequestDTO { OrderId = orderId }, accessToken);
if (paymentInfo == null)
throw new InvalidOperationException("Payment information not found for this order.");
if (paymentInfo.PaymentStatus != PaymentStatusEnum.Completed)
throw new InvalidOperationException("Payment is not successful.");
var user = await _userServiceClient.GetUserByIdAsync(order.UserId, accessToken);
if (user == null)
throw new InvalidOperationException("User does not exist.");
try
{
// Change order status to Confirmed
bool statusChanged = await _orderRepository.ChangeOrderStatusAsync(
orderId, OrderStatusEnum.Confirmed, "PaymentService", "Payment successful, order confirmed.");
if (!statusChanged)
throw new InvalidOperationException("Failed to update order status.");
// Now that the order is confirmed, publish an integration event
// Create the event payload that downstream services need
var orderPlacedEvent = new OrderPlacedEvent
{
OrderId = order.Id,
UserId = order.UserId,
CustomerName = user.FullName,
CustomerEmail = user.Email,
PhoneNumber = user.PhoneNumber,
TotalAmount = order.TotalAmount,
Items = order.OrderItems.Select(i => new OrderItemLine
{
ProductId = i.ProductId,
Quantity = i.Quantity,
UnitPrice = i.PriceAtPurchase
}).ToList()
};
// Publish the event to RabbitMQ
// - _publisher abstracts RabbitMQ communication
// - The message is sent to exchange "ecommerce.topic" with routing key "order.placed"
// - ProductService will consume this event to reduce stock
// - NotificationService will consume this event to insert a notification
// - correlationId (Guid.NewGuid().ToString()) helps trace this message across logs and services
await _publisher.PublishOrderPlacedAsync(orderPlacedEvent, Guid.NewGuid().ToString());
return true;
}
catch (Exception ex)
{
//Log the Exception
Console.WriteLine(ex.Message);
throw;
}
}
OrderService appsettings.json

Please add the following to the appsettings.json file. The RabbitMQ section in the appsettings.json file defines the configuration for connecting and working with RabbitMQ. It includes connection details like hostname, port, username, password, and virtual host, along with the main exchange (ecommerce.topic) used for publishing messages. It also specifies a dead-letter exchange and queue (ecommerce.dlx and ecommerce.dlq) for handling failed messages, and two queues (product.order_placed and notification.order_placed) where the Product and Notification services will receive order placed events.

"RabbitMq": {
"HostName": "localhost",
"Port": 5672,
"UserName": "ecommerce_user",
"Password": "Test@1234",
"VirtualHost": "ecommerce_vhost",
"ExchangeName": "ecommerce.topic",
"DlxExchangeName": "ecommerce.dlx",
"DlxQueueName": "ecommerce.dlq",
"ProductOrderPlacedQueue": "product.order_placed",
"NotificationOrderPlacedQueue": "notification.order_placed"
}
Order Microservice API Layer Program.cs

Please add the following code to the Program class. The following code is self-explained, so please read the comment lines for a better understanding.

// This registers our RabbitMqOptions section with the Options pattern in .NET.
// After this, we can inject IOptions<RabbitMqOptions>(or IOptionsMonitor< RabbitMqOptions >) into any service.
// This allows us to access RabbitMQ configuration (hostname, username, vhost, etc.) using DI.
builder.Services.Configure<RabbitMqOptions>(builder.Configuration.GetSection("RabbitMq"));
// Directly fetch the RabbitMqOptions values from configuration (appsettings.json).
// This is useful when you need to immediately use these settings during service registration.
var mq = builder.Configuration.GetSection("RabbitMq").Get<RabbitMqOptions>()!;
// Register a RabbitMQ connection/channel with the DI container.
// AddRabbitMq is a custom extension method (from Messaging.Common.Extensions) that:
// - Creates a persistent RabbitMQ connection
// - Creates an IModel (channel)
// - Registers them as singletons in the DI container
// This ensures all services reuse the same expensive RabbitMQ connection.
builder.Services.AddRabbitMq(mq.HostName, mq.UserName, mq.Password, mq.VirtualHost);
// Register the event publisher implementation as a singleton.
// IOrderEventPublisher is the contract (interface).
// RabbitMqOrderEventPublisher is the concrete implementation that publishes OrderPlacedEvent to RabbitMQ.
// Singleton lifetime is correct because publisher reuses the same RabbitMQ channel for all messages.
builder.Services.AddSingleton<IOrderEventPublisher, RabbitMqOrderEventPublisher>();
ProductService (Consumer)

The consumer will use a base consumer or a simple background consumer. First, create a folder named Messaging within the ProductService.Infrastructure layer project.

Product Microservice Contract Layer: Consumer Handler

First, create a folder named Messaging at the root level of ProductService.Contracts project. Then, create a class file named IOrderPlacedHandler.cs within the Messaging folder, and copy-paste the following code.

using Messaging.Common.Events;
namespace ProductService.Contracts.Messaging
{
public interface IOrderPlacedHandler
{
Task HandleAsync(OrderPlacedEvent evt);
}
}
Product Microservice Application Layer: Consumer Handler Implementation

First, create a folder named Messaging at the root level of ProductService.Application project. Then, create a class file named OrderPlacedHandler.cs within the Messaging folder, and copy-paste the following code. This class handles “OrderPlacedEvent” messages consumed from RabbitMQ. It implements IOrderPlacedHandler, so it defines how the ProductService reacts when an order is placed.

using Messaging.Common.Events;
using ProductService.Application.DTOs;
using ProductService.Application.Interfaces;
using ProductService.Contracts.Messaging;
namespace ProductService.Application.Messaging
{
public class OrderPlacedHandler : IOrderPlacedHandler
{
private readonly IInventoryService _inventory;  // Dependency: Inventory service used to update product stock.
// Constructor: injects IInventoryService via Dependency Injection.
// This allows OrderPlacedHandler to call inventory logic without being tightly coupled.
public OrderPlacedHandler(IInventoryService inventory)
{
_inventory = inventory;
}
// HandleAsync: This method is triggered whenever an OrderPlacedEvent is received from RabbitMQ.
public async Task HandleAsync(OrderPlacedEvent evt)
{
// Map event items into DTOs expected by the InventoryService.
// Each order item (product + quantity) becomes an InventoryUpdateDTO.
var stockUpdates = evt.Items.Select(i => new InventoryUpdateDTO
{
ProductId = i.ProductId,   // Product to update
Quantity = i.Quantity      // Quantity to reduce
}).ToList();
// Call the inventory service to decrease stock for all products in bulk.
// This ensures product quantities are reduced in the database after the order is confirmed.
await _inventory.DecreaseStockBulkAsync(stockUpdates);
}
}
}
Product Microservice Infrastructure Layer: Consumer

First, create a folder named Messaging at the root level of ProductService.Infrastructure project. Create a class file named OrderPlacedConsumer.cs within the Messaging folder, and copy-paste the following code. This consumer listens for “order.placed” events from RabbitMQ and passes them to the ProductService to update inventory.

using Messaging.Common.Events;
using Messaging.Common.Options;
using Messaging.Common.Topology;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ProductService.Contracts.Messaging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text.Json;
namespace ProductService.Infrastructure.Messaging
{
public sealed class OrderPlacedConsumer : BackgroundService
{
private readonly ILogger<OrderPlacedConsumer> _logger;     
private readonly IModel _channel;                          
private readonly RabbitMqOptions _opt;                     
private readonly IServiceScopeFactory _scopeFactory;      
// Constructor: dependencies are injected by DI container.
public OrderPlacedConsumer(
ILogger<OrderPlacedConsumer> logger,                   
IModel channel,                                        
IOptions<RabbitMqOptions> opt,                         
IServiceScopeFactory scopeFactory)                     
{
_logger = logger;
_channel = channel;
_opt = opt.Value;                                      
_scopeFactory = scopeFactory;
// Ensure exchange, queues, and bindings exist (idempotent).
RabbitTopology.EnsureAll(_channel, _opt);
}
// BackgroundService requires ExecuteAsync.
// This runs when the host starts and keeps listening until shutdown.
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// QoS (Quality of Service) in RabbitMQ defines how many messages a consumer can receive and hold “in-flight” before acknowledging them.
// Configure QoS: Allow max 10 unacknowledged messages per consumer at a time.
//      prefetchSize = 0 → no size limit(we don’t restrict by bytes).
//      prefetchCount = 10 → the consumer will receive at most 10 unacknowledged messages at once.
//      global = false → this limit applies per consumer, not across the entire channel.
_channel.BasicQos(0, 10, false);
// Create an async RabbitMQ consumer to handle messages.
var consumer = new AsyncEventingBasicConsumer(_channel);
// Define what happens when a message is received.
consumer.Received += async (_, ea) =>
{
try
{
// Deserialize the event payload into an OrderPlacedEvent object.
var evt = JsonSerializer.Deserialize<OrderPlacedEvent>(ea.Body.Span);
// If deserialization fails, reject the message and don't requeue.
if (evt == null)
{
// This tells:
//      I can’t read this message.
//      Don’t keep it in the queue.
//      Send it to the Dead Letter Queue (DLQ).
_channel.BasicNack(ea.DeliveryTag, false, false);
return;
}
// Create a new DI scope to resolve scoped services (e.g., DbContext, handler).
using var scope = _scopeFactory.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IOrderPlacedHandler>();
// Pass the event to the handler (business logic to decrease stock).
await handler.HandleAsync(evt);
// Acknowledge the message (tell RabbitMQ it was processed successfully).
// This is you telling RabbitMQ: Hey, I got this message, processed it successfully, you can remove it from the queue now.
// Without this, RabbitMQ keeps the message “unacknowledged” and may redeliver it.
// Think of it like clicking “Mark as Done” after finishing a task.
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
// Log any errors and reject the message (move to DLQ if configured).
_logger.LogError(ex, "Error handling order.placed");
// This is you telling RabbitMQ: I couldn’t process this message. Don’t mark it as done.
// The three parts mean:
//      ea.DeliveryTag → which exact message you’re talking about.
//      false → only this single message(not multiple).
//      false → don’t put it back in the same queue(send it to DLQ if configured).
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
// Start consuming messages from the configured queue (ProductOrderPlacedQueue).
// autoAck = false means manual acknowledgment (we control when to Ack/Nack).
// This is you telling RabbitMQ: Send me messages, but don’t auto-mark them as done. I’ll tell you myself when I’m finished.
// autoAck = false → you take control.
// This is why you need BasicAck above.
// If processing fails, you can send a Nack instead, and RabbitMQ can retry or send to DLQ.
_channel.BasicConsume(
queue: _opt.ProductOrderPlacedQueue,
autoAck: false,
consumer: consumer);
// Return completed task since this runs in the background forever.
return Task.CompletedTask;
}
}
}
Product Microservice API Layer Program.cs

Please add the following code to the Program class file:

builder.Services.Configure<RabbitMqOptions>(builder.Configuration.GetSection("RabbitMq"));
var mq = builder.Configuration.GetSection("RabbitMq").Get<RabbitMqOptions>()!;
builder.Services.AddRabbitMq(mq.HostName, mq.UserName, mq.Password, mq.VirtualHost);
builder.Services.AddScoped<IOrderPlacedHandler, OrderPlacedHandler>();
builder.Services.AddHostedService<OrderPlacedConsumer>();
ProductService appsettings.json

Please add the following to the appsettings.json file

"RabbitMq": {
"HostName": "localhost",
"Port": 5672,
"UserName": "ecommerce_user",
"Password": "Test@1234",
"VirtualHost": "ecommerce_vhost",
"ExchangeName": "ecommerce.topic",
"DlxExchangeName": "ecommerce.dlx",
"DlxQueueName": "ecommerce.dlq",
"ProductOrderPlacedQueue": "product.order_placed"
} 
Notification Service (Consumer)
Notification Microservice Contract Layer: Consumer Handler

First, create a folder named Messaging at the root level of NotificationService.Contracts project. Then, create a class file named IOrderPlacedHandler.cs within the Messaging folder, and copy-paste the following code.

using Messaging.Common.Events;
namespace NotificationService.Contracts.Messaging
{
public interface IOrderPlacedHandler
{
Task HandleAsync(OrderPlacedEvent evt);
}
}
Notification Microservice Application Layer: Consumer Handler Implementation

First, create a folder named Messaging at the root level of NotificationService.Application project. Then, create a class file named OrderPlacedHandler.cs within the Messaging folder, and copy-paste the following code. This handler listens to “OrderPlacedEvent” messages from RabbitMQ and converts them into notification requests inside the NotificationService.

using Messaging.Common.Events;
using NotificationService.Application.DTOs;
using NotificationService.Application.Interfaces;
using NotificationService.Contracts.Messaging;
using NotificationService.Domain.Enums;
using System.Text.Json;
namespace NotificationService.Application.Messaging
{
public class OrderPlacedHandler : IOrderPlacedHandler
{
private readonly INotificationService _notificationService;
// injects INotificationService via DI so we can call business logic.
public OrderPlacedHandler(INotificationService notificationService)
{
_notificationService = notificationService;
}
// HandleAsync is called whenever an OrderPlacedEvent is consumed from RabbitMQ.
public async Task HandleAsync(OrderPlacedEvent evt)
{
// Build structured Items array (Name, Quantity, Price)
var items = evt.Items.Select(i => new
{
Name = i.ProductId.ToString(),  // If you have product name, use that instead
Quantity = i.Quantity,
Price = i.UnitPrice
}).ToList();
// Serialize items into JSON so TemplateRenderer will recognize it as JsonElement
var itemsJson = JsonSerializer.Serialize(items);
// Build template data dictionary (keys must match placeholders in template)
var templateData = new Dictionary<string, object>
{
{ "CustomerName", evt.CustomerName },
{ "OrderNumber", evt.OrderNumber?.ToString() ?? string.Empty },
{ "Amount", evt.TotalAmount },
{ "Items", JsonDocument.Parse(itemsJson).RootElement } // passes structured JSON
};
var request = new CreateNotificationRequestDTO
{
UserId = evt.UserId,
TypeId = 1, // "OrderPlaced" type
Channel = NotificationChannelEnum.Email,
TemplateVersion = 1,
TemplateData = templateData,
Recipients = new List<RecipientDTO>
{
new RecipientDTO
{
RecipientTypeId = (int)RecipientTypeEnum.To,
Email = evt.CustomerEmail,
PhoneNumber = evt.PhoneNumber
}
},
Priority = NotificationPriorityEnum.Normal,
ScheduledAtUtc = null,
CreatedBy = "OrderPlacedHandler"
};
// Persist notification
await _notificationService.CreateAsync(request);
}
}
}
Notification Microservice Infrastructure Layer: Consumer

First, create a folder named Messaging at the root level of NotificationService.Infrastructure project. Create a class file named OrderPlacedConsumer.cs within the Messaging folder, and copy-paste the following code. This consumer listens for “OrderPlacedEvent” messages in the NotificationService. It is registered as a background service that runs with the host.

using Messaging.Common.Events;
using Messaging.Common.Options;
using Messaging.Common.Topology;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NotificationService.Contracts.Messaging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text.Json;
namespace NotificationService.Infrastructure.Messaging.Consumers
{
public sealed class OrderPlacedConsumer : BackgroundService
{
private readonly ILogger<OrderPlacedConsumer> _logger;   // Logging for diagnostics.
private readonly IModel _channel;                        // RabbitMQ channel to consume messages.
private readonly RabbitMqOptions _options;               // RabbitMQ configuration values (queues, exchange, etc.).
private readonly IServiceScopeFactory _scopeFactory;     // Used to resolve scoped dependencies inside a singleton consumer.
// Constructor: dependencies are injected by DI.
public OrderPlacedConsumer(
ILogger<OrderPlacedConsumer> logger,                 // Logging instance
IModel channel,                                      // RabbitMQ channel
IOptions<RabbitMqOptions> options,                   // Injected RabbitMQ options from appsettings.json
IServiceScopeFactory scopeFactory)                   // Scope factory for resolving scoped services (like DbContext, handlers)
{
_logger = logger;
_channel = channel;
_options = options.Value;                            // Extract actual RabbitMQ settings
_scopeFactory = scopeFactory;
// Ensure exchange, queues, and bindings exist before consuming.
RabbitTopology.EnsureAll(_channel, _options);
}
// BackgroundService entry point → runs when the host starts.
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Configure QoS: max 10 unacknowledged messages per consumer.
// Prevents flooding one consumer with too many messages at once.
_channel.BasicQos(0, 10, false);
// Create an async consumer for RabbitMQ
var consumer = new AsyncEventingBasicConsumer(_channel);
// Event handler: triggered when a new message is received
consumer.Received += async (_, ea) =>
{
try
{
// Deserialize the raw message body into an OrderPlacedEvent.
// PropertyNameCaseInsensitive = true allows both camelCase and PascalCase JSON.
var evt = JsonSerializer.Deserialize<OrderPlacedEvent>(ea.Body.Span, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
});
// If deserialization failed, reject the message and send to DLQ.
if (evt == null)
{
_logger.LogWarning("Received null or invalid OrderPlacedEvent.");
_channel.BasicNack(ea.DeliveryTag, false, false); // NACK → reject without requeue
return;
}
// Create a new DI scope so we can resolve scoped services (like IOrderPlacedHandler).
using var scope = _scopeFactory.CreateScope();
var handler = scope.ServiceProvider.GetRequiredService<IOrderPlacedHandler>();
// Pass the event to the handler (business logic: create notification entry).
await handler.HandleAsync(evt);
// Acknowledge the message → remove from queue after successful processing.
_channel.BasicAck(ea.DeliveryTag, false);
// Log success with the OrderId for traceability.
_logger.LogInformation("Processed OrderPlacedEvent for Order {OrderId}", evt.OrderId);
}
catch (Exception ex)
{
// Log any failure, reject the message (send to DLQ).
_logger.LogError(ex, "Failed to process OrderPlacedEvent");
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
// Start consuming messages from the Notification queue.
// autoAck = false → we manually Ack/Nack messages after processing.
_channel.BasicConsume(
queue: _options.NotificationOrderPlacedQueue,
autoAck: false,
consumer: consumer);
// Return completed task → consumer runs in background indefinitely.
return Task.CompletedTask;
}
}
}
NotificationService Program.cs

Please add the following code to the Program class file:

// Bind RabbitMQ config
builder.Services.Configure<RabbitMqOptions>(builder.Configuration.GetSection("RabbitMq"));
var mq = builder.Configuration.GetSection("RabbitMq").Get<RabbitMqOptions>()!;
// Register RabbitMQ channel (you likely already have AddRabbitMq in Messaging.Common)
builder.Services.AddRabbitMq(mq.HostName, mq.UserName, mq.Password, mq.VirtualHost);
// Register handler
builder.Services.AddScoped<IOrderPlacedHandler, OrderPlacedHandler>();
// Register consumer
builder.Services.AddHostedService<OrderPlacedConsumer>();
NotificationService appsettings.json

Please add the following to the appsettings.json file

"RabbitMq": {
"HostName": "localhost",
"Port": 5672,
"UserName": "ecommerce_user",
"Password": "Test@1234",
"VirtualHost": "ecommerce_vhost",
"ExchangeName": "ecommerce.topic",
"DlxExchangeName": "ecommerce.dlx",
"DlxQueueName": "ecommerce.dlq",
"NotificationOrderPlacedQueue": "notification.order_placed"
}
Automating the Notification Service:

We want notifications to be processed automatically in the background without needing a manual controller call. To do this, we will create a BackgroundService that periodically checks for pending notifications and processes them.

Define the Contract (Notification.Contracts)

The Contracts project defines the interfaces and DTOs. Here, we define an interface INotificationProcessor that describes how notifications should be processed. So, first create a folder named Interfaces within NotificationService.Contracts project. Then, create a class file named INotificationProcessor.cs within the Interfaces folder, and copy-paste the following code. This makes sure other layers (like Infrastructure) only depend on contracts, not Application internals.

namespace NotificationService.Contracts.Interfaces
{
// Defines contract for processing queued notifications.
public interface INotificationProcessor
{
// Process a batch of pending notifications from the queue.
// take: Max number of notifications to process
// skip: Number of notifications to skip (for pagination)
Task ProcessQueueBatchAsync(int take, int skip);
}
}
Implement the Contract in the Application Layer

The Application Layer holds the business logic. Our existing NotificationService will implement this INotificationProcessor interface. The ProcessQueueBatchAsync method is already implemented in the service class. Therefore, we don’t need to reimplement it.

using NotificationService.Contracts.Interfaces;
namespace NotificationService.Application.Services
{
public class NotificationService : INotificationService, INotificationProcessor
{
//Existing Code
}
}
Create the Background Worker in the Infrastructure Layer

The Infrastructure Layer is responsible for hosting background jobs. We will create a NotificationWorker that periodically calls INotificationProcessor. First, create a folder named BackgroundJobs within the NotificationService.Infrastructure layer project. Then, create a class file named NotificationWorker.cs within the BackgroundJobs folder, and copy-paste the following code. This worker does not depend on the Application directly, only on INotificationProcessor from Contracts.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NotificationService.Contracts.Interfaces;
namespace NotificationService.Infrastructure.BackgroundJobs
{
// Background worker that runs periodically and processes queued notifications.
public sealed class NotificationWorker : BackgroundService
{
private readonly ILogger<NotificationWorker> _logger;       // For structured logging
private readonly IServiceScopeFactory _scopeFactory;        // To create DI scopes for scoped services
private readonly TimeSpan _interval = TimeSpan.FromSeconds(30); // Interval between runs (30s)
// onstructor - dependencies are injected by the DI container
public NotificationWorker(
ILogger<NotificationWorker> logger,         // Logging dependency
IServiceScopeFactory scopeFactory)          // Scope factory for resolving scoped services
{
_logger = logger;
_scopeFactory = scopeFactory;
}
// This method is called when the host starts the background service.
// It will loop until the application stops (or cancellation is requested).
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("NotificationWorker started.");
// Continuous loop - keeps running until application shutdown
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Create a new DI scope (since BackgroundService is Singleton,
// but we need scoped services like DbContext or NotificationService)
using var scope = _scopeFactory.CreateScope();
// Resolve the processor (Application implementation of INotificationProcessor)
var processor = scope.ServiceProvider.GetRequiredService<INotificationProcessor>();
// Process pending notifications (batch of 50 per cycle, skip = 0 for automation)
await processor.ProcessQueueBatchAsync(50, 0);
}
catch (Exception ex)
{
// Log the error but keep the worker running
_logger.LogError(ex, "Error while processing notifications.");
}
// Wait 30 seconds before checking again
await Task.Delay(_interval, stoppingToken);
}
}
}
}
Register Services in Program Class:

Please add the following code to the Program class file of INotificationService.API layer project.

// Register Application service
builder.Services.AddScoped<INotificationProcessor, NotificationService.Application.Services.NotificationService>();
// Register Background Worker
builder.Services.AddHostedService<NotificationWorker>();
What is AddHostedService<T>()?
  • AddHostedService<T>() is an extension method in Microsoft.Extensions.DependencyInjection.
  • It tells the ASP.NET Core DI container to register a background worker (a class that inherits from BackgroundService or implements IHostedService).
  • Once registered, ASP.NET Core will automatically start and stop that service with the application’s lifecycle.

In our Example

  • This registers your NotificationWorker as a hosted service.
  • When the API starts, ASP.NET Core:
    1. Creates an instance of NotificationWorker.
    2. Calls its ExecuteAsync() method.
    3. Keeps it running in the background until the application stops.

In this post, I discuss how to Implement End-to-end Order Placed Communication using RabbitMQ. In the next post, I will discuss how to implement the Saga Pattern using RabbitMQ in Microservices.

Leave a Reply

Your email address will not be published. Required fields are marked *