Back to: Microservices using ASP.NET Core Web API Tutorials
Define Shared Messaging Infrastructure for Saga Pattern
The Messaging.Common project serves as the shared messaging backbone for all microservices participating in the RabbitMQ-based Saga orchestration. It defines a unified structure for events, connection handling, and topology management, ensuring that every microservice communicates using the same message format, routing conventions, and exchange configurations.
By centralizing these components, the solution promotes consistency, reliability, and maintainability across the distributed system while reducing code and configuration duplication.
Real-World Saga Flow (Success Path)
- OrderService → Publishes OrderPlacedEvent.
- OrchestratorService → Consumes OrderPlacedEvent and Raises StockReservationRequestedEvent.
- ProductService → Consumes StockReservationRequestedEvent, reserves stock, raises StockReservedCompletedEvent.
- OrchestratorService → Consumes StockReservedCompletedEvent and raises OrderConfirmedEvent.
- OrderService → Consumes OrderConfirmedEvent and updates order status = Confirmed
- NotificationService → Consumes OrderConfirmedEvent and sends confirmation email/SMS to customer
Real-World Saga Flow (Failure Path)
- OrderService → Publishes OrderPlacedEvent
- OrchestratorService → Publishes StockReservationRequestedEvent
- ProductService → Fails to reserve → Publishes StockReservationFailedEvent
- OrchestratorService → Publishes OrderCancelledEvent
- OrderService → Marks order as Cancelled
- NotificationService → Sends cancellation email/SMS with failed item details
Creating Models:
Models represent the Data Structures shared between different microservices. They define how event data looks, for example, order details, product items, or failed stock entries. They ensure that all services understand the same message structure, which is crucial for consistent data exchange.
- EventBase → common properties like EventId, Timestamp, and CorrelationId.
- OrderLineItem → details of each product in an order.
- FailedLineItem → details of failed stock reservations.
In short, Models are the blueprint for the data being sent or received through messages.
Models/EventBase.cs
EventBase is the root class for all events exchanged between microservices. It ensures that every event follows a consistent structure with a unique identifier (EventId), a creation timestamp (Timestamp), and an optional correlation ID (CorrelationId). This enables traceability and debugging across the entire distributed system, especially in a Saga pattern. We have already created the following model within the Models folder of the Messaging.Common project.
namespace Messaging.Common.Models
{
// The base class for all domain events in the system.
// Every event that travels across microservices (e.g., OrderPlacedEvent,
// StockReservedCompletedEvent, OrderCancelledEvent) inherits from this class.
// Purpose:
// - Provides consistent metadata for all events.
// - Enables tracking, logging, and distributed tracing across services.
// - Ensures each event is uniquely identifiable.
public abstract class EventBase
{
// ---------------------------------------------------------------------
// Unique Event Identifier
// ---------------------------------------------------------------------
// A unique GUID automatically assigned to each event when it is created.
// This helps distinguish between multiple instances of the same event type.
// Example:
// - Two separate OrderPlacedEvent messages will each have their own EventId.
// Used for:
// Deduplication checks
// Logging and debugging
// Tracking messages in monitoring tools
public Guid EventId { get; private set; } = Guid.NewGuid();
// ---------------------------------------------------------------------
// Event Creation Timestamp
// ---------------------------------------------------------------------
// The exact UTC date and time when the event object was created.
public DateTime Timestamp { get; private set; } = DateTime.UtcNow;
// ---------------------------------------------------------------------
// Correlation Identifier (Trace ID)
// ---------------------------------------------------------------------
// This optional string allows tracking of a single business transaction
// across multiple microservices.
// Example:
// - When a customer places an order, the order service assigns a CorrelationId.
// - All subsequent events (StockReservationRequested, StockReservedCompleted,
// OrderConfirmed, etc.) reuse the same CorrelationId.
// This makes it possible to trace the entire flow of a request end-to-end
// across different services in logs, monitoring tools (like Kibana or Application Insights),
// or distributed tracing systems (like OpenTelemetry or Jaeger).
public string? CorrelationId { get; set; }
}
}
Key Points:
- Provides a common contract for all event messages.
- Guarantees that each event is uniquely identifiable.
- Enables cross-service tracing through CorrelationId.
- Ensures event ordering and auditing with Timestamp.
Models/OrderLineItem.cs
OrderItemLine defines the structure of an individual product item within an order. It captures essential attributes like product ID, quantity, and price. Since multiple events (such as OrderPlacedEvent and StockReservedEvent) reference line items, this class ensures consistency across all such messages. Create a class file named OrderLineItem.cs within the Models folder of Messaging.Common project, and copy-paste the following code.
namespace Messaging.Common.Models
{
// Represents a single product line item within an order.
// Purpose:
// - Standardize how order items are represented in all event messages.
// - Avoid duplication of similar models in different services.
// - Ensure consistency between OrderService, ProductService, and NotificationService.
// Typical Usage:
// - Inside OrderPlacedEvent → carries all ordered products.
// - Inside StockReservationRequestedEvent → specifies which products need stock check.
// - Inside StockReservedCompletedEvent → confirms what was successfully reserved.
// - Inside OrderConfirmedEvent → included for final confirmation and email notifications.
public sealed class OrderLineItem
{
// --------------------------------------------------------------------
// Product Identity
// --------------------------------------------------------------------
// The unique identifier (GUID) of the product being ordered.
// Used for:
// - Matching inventory in ProductService.
// - Calculating stock levels.
// - Linking order details to product catalog data.
public Guid ProductId { get; set; }
// --------------------------------------------------------------------
// Quantity
// --------------------------------------------------------------------
// The total number of units of this product included in the order.
// Used for:
// - Stock validation and reservation.
// - Calculating the total line price (Quantity × UnitPrice).
// - Generating invoices and order summaries.
public int Quantity { get; set; }
// --------------------------------------------------------------------
// Pricing Information
// --------------------------------------------------------------------
// The price per unit of the product at the time of order placement.
// Important Notes:
// - Captures the price snapshot at order time (even if price later changes).
// - Used for financial calculations in OrderService and for displaying
// correct totals in notifications and invoices.
// Example:
// If Quantity = 3 and UnitPrice = 500, total line cost = 1500.
public decimal UnitPrice { get; set; }// Price per unit at time of order
}
}
Key Points:
- Represents a single product line within an order.
- Promotes data consistency across all event types.
- Used in multiple events like OrderPlacedEvent and OrderConfirmedEvent.
- Simplifies serialization and mapping of order data.
Models/FailedLineItem.cs
Captures detailed information about Failed Stock Reservations. It captures which product could not be reserved, how much was requested versus what was available, and the reason for the failure. This helps in detailed failure reporting and customer notification. Create a class file named FailedLineItem.cs within the Models folder of Messaging.Common project, and copy-paste the following code.
namespace Messaging.Common.Models
{
// Represents details about a specific product line that failed during
// the stock reservation process in the Product Microservice.
// Purpose:
// - Helps identify which products caused the stock reservation failure.
// - Provides item-level diagnostics for OrchestratorService and NotificationService.
// - Enables clear audit trails and customer communication (e.g., “Item X is out of stock”).
// Used In:
// - StockReservationFailedEvent (raised by ProductService)
// - OrderCancelledEvent (raised by OrchestratorService)
// This model gives granular information on which item failed, how much stock was available,
// and why it could not be reserved.
public sealed class FailedLineItem
{
// ---------------------------------------------------------------------
// Product Identifier
// ---------------------------------------------------------------------
// The unique ID of the product that failed during the reservation process.
// Corresponds to ProductId in the ProductService database.
// Used for:
// - Logging which product caused the issue.
// - Triggering compensating actions (like refunding or adjusting order status).
public Guid ProductId { get; set; }
// ---------------------------------------------------------------------
// Requested Quantity
// ---------------------------------------------------------------------
// The number of units that the order originally requested for this product.
// Example:
// If the customer ordered 5 units but only 2 were in stock,
// Requested = 5 and Available = 2.
// Used for:
// - Comparing against actual available stock.
// - Determining whether the failure is full or partial.
public int Requested { get; set; }
// ---------------------------------------------------------------------
// Available Quantity
// ---------------------------------------------------------------------
// The number of units that were actually available in stock at reservation time.
// If 0 → completely out of stock.
// If less than Requested → partial shortage.
// Helps the Orchestrator decide whether to:
// - Cancel the entire order (if multiple failures).
// - Proceed partially (if some items are still available).
public int Available { get; set; }
// ---------------------------------------------------------------------
// Failure Reason
// ---------------------------------------------------------------------
// Describes why this particular line item failed to reserve.
// Default = "Insufficient stock", but it can also be customized
// to represent other failure causes, such as:
// - "Product discontinued"
// - "Warehouse not reachable"
// - "Inventory sync error"
// This field is especially useful for audit logs and customer-facing messages.
public string Reason { get; set; } = "Insufficient stock";
}
}
Key Points:
- Records failure details at the product level.
- Enables granular failure reporting in stock operations.
- Used in StockReservationFailedEvent.
- Provides meaningful error context for compensation and notifications.
Creating Events:
Events are messages that describe events that have occurred in a microservice. Each event carries information about that action and is published to RabbitMQ for other services to consume. They enable communication between microservices without tight coupling. One service raises an event, and others react to it.
- OrderPlacedEvent → sent when a new order is created.
- StockReservationRequestedEvent → requests a stock reservation.
- OrderConfirmedEvent → confirms successful processing.
In short: Events are the “signals” or “notifications” that move data and trigger actions across microservices.
Events/OrderPlacedEvent.cs
This event is raised by the OrderService when a new order is successfully placed (status = Pending). It kickstarts the Saga orchestration process by notifying the Orchestrator service to initiate stock reservation. We have already created the following event within the Events folder of the Messaging.Common project.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// This event is published by the Order Microservice after
// a new order is successfully created and saved to the database with Pending State
// Purpose:
// - This event marks the beginning of the Saga orchestration process.
// - It is published to RabbitMQ so that the Orchestrator Microservice
// can listen and trigger the next step — stock reservation.
// Consumers:
// - OrchestratorService (which then publishes StockReservationRequestedEvent)
// Flow:
// 1️. OrderService places an order and raises this event.
// 2️. OrchestratorService consumes this event.
// 3️. OrchestratorService requests ProductService to reserve stock.
public sealed class OrderPlacedEvent : EventBase
{
// --------------------------------------------------------------------
// Event Metadata (Inherited from EventBase)
// --------------------------------------------------------------------
// EventBase includes:
// - EventId: Unique ID for each event instance.
// - Timestamp: UTC time when the event was created.
// - CorrelationId: Tracks this event across multiple microservices in the same workflow.
// Example: OrderPlacedEvent → StockReservationRequestedEvent → OrderConfirmedEvent
// All will share the same CorrelationId for traceability.
// --------------------------------------------------------------------
// Order Information
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public string OrderNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Customer Details
// --------------------------------------------------------------------
public Guid UserId { get; set; }
public string CustomerName { get; set; } = null!;
public string CustomerEmail { get; set; } = null!;
public string PhoneNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Order Amount
// --------------------------------------------------------------------
public decimal TotalAmount { get; set; }
// --------------------------------------------------------------------
// Ordered Items
// --------------------------------------------------------------------
// Shared Structure:
// - ProductId → Identifies which product
// - Quantity → Number of units ordered
// - UnitPrice → Price per unit at the time of order
public List<OrderLineItem> Items { get; set; } = new();
}
}
Key Points:
- Trigger point for the Saga workflow.
- Carries Complete Order Details and Customer Info.
- Consumed by OrchestratorService.
- Leads to the publishing of StockReservationRequestedEvent.
Events/StockReservationRequestedEvent.cs
Raised by the OrchestratorService, this event requests the ProductService to reserve stock for a given order. It’s the bridge between order placement and stock validation. So, create a class file named StockReservationRequestedEvent.cs within the Events folder of Messaging.Common project, and copy-paste the following code.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// Represents an event raised by the Orchestrator Microservice to request
// stock reservation from the Product Microservice.
// Purpose:
// - Acts as the "next step" in the Saga flow after an order is placed.
// - Tells the ProductService to check inventory and reserve stock
// for all items in the order.
// Flow:
// 1️. OrderService publishes OrderPlacedEvent.
// 2️. OrchestratorService consumes that event.
// 3️. OrchestratorService raises StockReservationRequestedEvent → ProductService.
// 4️. ProductService processes the request and:
// - If successful → raises StockReservedCompletedEvent.
// - If failed → raises StockReservationFailedEvent.
// Published By:
// - OrchestratorService
// Consumed By:
// - ProductService
public sealed class StockReservationRequestedEvent : EventBase
{
// --------------------------------------------------------------------
// Event Metadata (Inherited from EventBase)
// --------------------------------------------------------------------
// EventBase provides:
// - EventId → Unique ID for this event instance.
// - Timestamp → Time when this event was created.
// - CorrelationId → Shared across all related events in the same Saga transaction.
// This ensures that every event in the Order workflow
// (OrderPlaced, StockRequested, StockReserved, etc.)
// can be traced end-to-end across all microservices.
// --------------------------------------------------------------------
// Order and Customer Identification
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
// --------------------------------------------------------------------
// Items for Stock Reservation
// --------------------------------------------------------------------
// A list of all the products included in this order
// that require stock reservation.
// Each item is represented by an OrderLineItem model, which includes:
// - ProductId → Identifies which product to reserve.
// - Quantity → Number of units needed.
// - UnitPrice → Price at the time of ordering (for reference or logging).
// The ProductService will:
// - Check the inventory for each ProductId.
// - Reserves the requested Quantity if available.
// - Return a success or failure event accordingly.
public List<OrderLineItem> Items { get; set; } = new();
}
}
Key Points:
- Initiates the Inventory Reservation step.
- Consumed by ProductService.
- On success, ProductService raises StockReservedCompletedEvent.
- On failure, ProductService raises StockReservationFailedEvent.
Events/StockReservedCompletedEvent.cs
This event is published by the ProductService once stock is successfully reserved for an order. It informs the Orchestrator that the order can be confirmed. Create a class file named StockReservedCompletedEvent.cs within the Events folder of Messaging.Common project, and copy-paste the following code.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// Represents an event raised by the Product Microservice
// when stock reservation is successfully completed for an order.
// Purpose:
// - Indicates that the inventory check and stock reservation
// have been successfully processed.
// - Serves as a "Success Signal" back to the Orchestrator Microservice
// to continue the Saga flow and confirm the order.
// Flow:
// 1️. OrchestratorService → Publishes StockReservationRequestedEvent.
// 2️. ProductService → Consumes that request and checks inventory.
// 3️. If all items have sufficient stock, ProductService → Publishes StockReservedCompletedEvent.
// 4️. OrchestratorService → Consumes this event and publishes OrderConfirmedEvent.
// Published By:
// - ProductService
// Consumed By:
// - OrchestratorService
public sealed class StockReservedCompletedEvent : EventBase
{
// --------------------------------------------------------------------
// Order and Customer Information
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
// --------------------------------------------------------------------
// Reserved Product Items
// --------------------------------------------------------------------
// The list of items (products) for which stock has been successfully reserved.
// Each item in this list contains:
// - ProductId → Which product was reserved.
// - Quantity → How many units were locked.
// - UnitPrice → The price at which it was ordered (for context, not validation).
// The OrchestratorService includes these items when raising
// the next event (OrderConfirmedEvent), ensuring all services
// share a consistent view of what was reserved.
public List<OrderLineItem> Items { get; set; } = new();
}
}
Key Points:
- Indicates Successful Stock Lock.
- Consumed by OrchestratorService.
- Leads to the publishing of OrderConfirmedEvent.
- Ensures Order Progression in the success path.
Events/StockReservationFailedEvent
This event signals a stock reservation failure. It includes a high-level reason and a detailed list of failed items. The Orchestrator uses this to trigger a compensation flow by cancelling the order. Create a class file named StockReservationFailedEvent.cs within the Events folder of Messaging.Common project, and copy-paste the following code.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// Represents an event raised by the Product Microservice when
// stock reservation for an order fails — either partially or completely.
// Purpose:
// - Notifies the Orchestrator Microservice that one or more items
// in the order could not be reserved (e.g., out of stock, discontinued, etc.).
// - Triggers the Orchestrator to start the **compensation flow** by
// publishing an OrderCancelledEvent.
// Flow:
// 1️. OrchestratorService → Publishes StockReservationRequestedEvent.
// 2️. ProductService → Consumes that event and tries to reserve stock.
// 3️. If any item cannot be reserved → Publishes StockReservationFailedEvent.
// 4️. OrchestratorService → Consumes this event and publishes OrderCancelledEvent.
// Published By:
// - ProductService
// Consumed By:
// - OrchestratorService
public sealed class StockReservationFailedEvent : EventBase
{
// --------------------------------------------------------------------
// Order & User Information
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
// --------------------------------------------------------------------
// Failure Information
// --------------------------------------------------------------------
// A short, high-level reason describing why stock reservation failed.
// Examples:
// - "Insufficient stock"
// - "Product discontinued"
// - "Inventory service timeout"
// Used mainly for logs, alerts, or customer-facing notifications.
public string Reason { get; set; } = "Insufficient stock";
// --------------------------------------------------------------------
// Failed Items Details
// --------------------------------------------------------------------
// Detailed information about which specific items in the order failed
// and the reason for each failure.
// Each FailedLineItem entry typically contains:
// - ProductId → Identifies the product that could not be reserved.
// - Requested → Number of units the order attempted to reserve.
// - Available → Number of units that were actually available at the time.
// - Reason → Human-readable explanation (e.g., "Only 1 left in stock").
// This allows the OrchestratorService or NotificationService to
// clearly communicate item-level failure reasons to users or logs.
public List<FailedLineItem> FailedItems { get; set; } = new();
}
}
Key Points:
- Communicates Failure in Stock Reservation.
- Includes reasons and product-level details.
- Consumed by OrchestratorService.
- Leads to the publishing of OrderCancelledEvent.
Events/OrderConfirmedEvent
Raised by the OrchestratorService after the stock reservation succeeds. It notifies the OrderService to update the order status to Confirmed and the NotificationService to send a confirmation email/SMS to the customer. Create a class file named OrderConfirmedEvent.cs within the Events folder of Messaging.Common project, and copy-paste the following code.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// Represents an event raised by the Orchestrator Microservice
// when stock reservation succeeds for an order.
// This event is generated in response to the StockReservedCompletedEvent
// raised by the Product Microservice.
// Purpose:
// - Confirms that all products in the order have been successfully reserved.
// - Signals downstream microservices to proceed with post-reservation actions.
// Consumed By:
// - Order Microservice → Marks the order as "Confirmed" in the database.
// - Notification Microservice → Sends "Order Confirmation" email/SMS to the customer.
// Flow:
// 1️. ProductService → Publishes StockReservedCompletedEvent (success).
// 2️. OrchestratorService → Consumes it and raises OrderConfirmedEvent.
// 3️. OrderService & NotificationService → Consume this event for their respective actions.
public sealed class OrderConfirmedEvent : EventBase
{
// --------------------------------------------------------------------
// Order Details
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public string OrderNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Customer Details
// --------------------------------------------------------------------
public Guid UserId { get; set; }
public string CustomerName { get; set; } = null!;
public string CustomerEmail { get; set; } = null!;
public string PhoneNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Order Amount
// --------------------------------------------------------------------
public decimal TotalAmount { get; set; }
// --------------------------------------------------------------------
// Ordered Products
// --------------------------------------------------------------------
// List of all products included in the confirmed order.
// Each OrderLineItem contains:
// - ProductId → The ID of the reserved product.
// - Quantity → Number of units ordered.
// - UnitPrice → The price per unit when the order was placed.
// This ensures every consumer (Order, Notification, Payment, etc.)
// has a consistent view of the confirmed order contents.
public List<OrderLineItem> Items { get; set; } = new();
}
}
Key Points:
- Represents Happy-Path Completion.
- Consumed by OrderService and NotificationService.
- Confirms successful order placement.
- Ensures customer communication is triggered.
Events/OrderCancelledEvent
Raised by the OrchestratorService when stock reservation fails or times out. It’s used to compensate the order by marking it as cancelled and notifying the customer. Create a class file named OrderCancelledEvent.cs within the Events folder of Messaging.Common project, and copy-paste the following code.
using Messaging.Common.Models;
namespace Messaging.Common.Events
{
// Represents an event raised by the Orchestrator Microservice when
// stock reservation fails for an order.
// This event is created in response to the StockReservationFailedEvent
// raised by the Product Microservice.
// Purpose:
// - Notifies downstream services that the order could not be confirmed.
// - Informs the Order Microservice to mark the order as "Cancelled".
// - Informs the Notification Microservice to send a "Cancellation Email" to the user.
// Flow:
// 1️. ProductService → Publishes StockReservationFailedEvent.
// 2️. OrchestratorService → Consumes it and raises OrderCancelledEvent.
// 3️. OrderService → Updates order status to "Cancelled".
// 4️. NotificationService → Sends cancellation message to customer.
// Published By:
// - OrchestratorService
// Consumed By:
// - OrderService
// - NotificationService
public sealed class OrderCancelledEvent : EventBase
{
// --------------------------------------------------------------------
// Order Information
// --------------------------------------------------------------------
public Guid OrderId { get; set; }
public string OrderNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Customer Details
// --------------------------------------------------------------------
public Guid UserId { get; set; }
public string CustomerName { get; set; } = null!;
public string CustomerEmail { get; set; } = null!;
public string PhoneNumber { get; set; } = null!;
// --------------------------------------------------------------------
// Order Amount
// --------------------------------------------------------------------
public decimal TotalAmount { get; set; }
// --------------------------------------------------------------------
// Cancellation Reason
// --------------------------------------------------------------------
// A high-level description of why the order was cancelled.
// Examples:
// - "Stock Reservation Failed"
// - "One or more items are unavailable"
public string Reason { get; set; } = "Stock Reservation Failed";
// --------------------------------------------------------------------
// Failed Item Details
// --------------------------------------------------------------------
// Contains detailed information about which items failed during
// the stock reservation process, leading to this order being cancelled.
// Each FailedLineItem entry includes:
// - ProductId → Which product caused the failure.
// - Requested → Quantity originally ordered.
// - Available → Quantity actually available in stock.
// - Reason → Specific reason for failure (e.g., "Insufficient stock").
// This list allows NotificationService to communicate exactly which
// products failed and helps OrderService for auditing or analytics.
public List<FailedLineItem> Items { get; set; } = new();
}
}
Key Points:
- Handles the Failure/Rollback scenario of the Saga.
- Consumed by OrderService (for status update) and NotificationService (for email).
- Contains reason and order details.
- Ensures data consistency across systems.
Creating RabbitMQ Options
RabbitMqOptions is a Configuration Class that stores all connection details, queue names, exchange names, and routing keys for RabbitMQ. It ensures that every microservice uses the Same Configuration and Naming Conventions, avoiding mismatches and runtime errors.
- HostName, UserName, Password → connection details.
- ExchangeName, DlxExchangeName → exchange setup.
- Routing Keys → define message routes like order.placed or order.cancelled.
In short, RabbitMQ Options act as a central configuration guide for all messaging components.
Options/RabbitMqOptions.cs
Defines the Central Configuration for RabbitMQ. All microservices use this configuration structure to ensure consistent naming for Exchange, Queue, and Routing Key across the ecosystem. It encapsulates all connection details and message routing conventions.
We have already created the following RabbitMqOptions class within the Options folder of the Messaging.Common project. Please update the RabbitMqOptions.cs file as follows.
namespace Messaging.Common.Options
{
// RabbitMqOptions defines all configurable settings used by RabbitMQ
// across the microservices in the e-commerce system.
// Every service reads these values (usually from appsettings.json)
// to ensure consistent naming of exchanges, queues, and routing keys.
// This ensures all services use consistent naming and connection parameters.
// Think of this as the "single source of truth" for RabbitMQ configuration.
public sealed class RabbitMqOptions
{
// --------------------------------------------------------------------
// Connection Settings
// --------------------------------------------------------------------
// These properties define how the application connects to the RabbitMQ broker.
// They remain consistent across all microservices so each service
// can connect to the same RabbitMQ instance securely and reliably.
// The hostname or IP address of the RabbitMQ broker.
// Example: "localhost" for local dev or a server name in production.
public string HostName { get; set; } = "localhost";
// The default port for AMQP (5672).
// You can change it if RabbitMQ runs on a different port.
public int Port { get; set; } = 5672;
// The virtual host (vhost) in RabbitMQ.
// Virtual host name used for logical isolation within RabbitMQ.
// Think of a vhost as a "namespace" for your queues and exchanges.
public string VirtualHost { get; set; } = "ecommerce_vhost";
// The username used to authenticate with the RabbitMQ broker.
// Ensure this account has permissions for the specified virtual host.
public string UserName { get; set; } = "ecommerce_user";
// The password for the RabbitMQ user account.
// Should be stored securely (in environment variables or secret store).
public string Password { get; set; } = "Test@1234";
// --------------------------------------------------------------------
// Exchanges & Dead-Letter Configuration
// --------------------------------------------------------------------
// Exchanges are the entry points for messages in RabbitMQ.
// The main exchange routes business events (like order placed, stock reserved, etc.),
// while the DLX (Dead Letter Exchange) captures failed or rejected messages.
// The main topic exchange where all domain events are published (order placed, stock reserved, etc.).
public string ExchangeName { get; set; } = "ecommerce.topic";
// The Dead Letter Exchange (DLX) that receives messages which
// cannot be processed or are explicitly rejected by consumers.
public string? DlxExchangeName { get; set; } = "ecommerce.dlx";
// The Dead Letter Queue (DLQ) bound to the DLX.
// All failed or unprocessed messages are stored here for later inspection.
public string? DlxQueueName { get; set; } = "ecommerce.dlq";
// --------------------------------------------------------------------
// Routing Keys
// --------------------------------------------------------------------
// Routing keys define the "address" or "topic" of each message.
// They help RabbitMQ determine which queues should receive a message.
// Each key corresponds to a specific event in the Saga flow.
// Routing key used when an order is placed by the OrderService.
// Fired by OrderService → Consumed by Orchestrator
public string RkOrderPlaced { get; set; } = "order.placed";
// Routing key used when the Orchestrator requests stock reservation from the ProductService.
// Fired by Orchestrator → Consumed by ProductService
public string RkStockReservationRequested { get; set; } = "stock.reservation.requested";
// Routing key used when ProductService confirms that stock was successfully reserved.
// Fired by ProductService → Consumed by Orchestrator (success path)
public string RkStockReserved { get; set; } = "stock.reserved";
// Routing key used when ProductService fails to reserve stock (e.g., insufficient quantity).
// Fired by ProductService → Consumed by Orchestrator (failure path)
public string RkStockFailed { get; set; } = "stock.reservation_failed";
// Routing key used when Orchestrator confirms an order after successful stock reservation.
// Fired by Orchestrator → Consumed by OrderService & NotificationService (success)
public string RkOrderConfirmed { get; set; } = "order.confirmed";
// Routing key used when Orchestrator cancels an order due to stock failure.
// Fired by Orchestrator → Consumed by OrderService & NotificationService (failure)
public string RkOrderCancelled { get; set; } = "order.cancelled";
// --------------------------------------------------------------------
// Queue Names
// --------------------------------------------------------------------
// Queues are where consumers (services) actually listen for messages.
// Each microservice has its own dedicated queues based on the events it handles.
// Queues are bound to routing keys to receive relevant messages.
// Naming convention: [service].[event_purpose]
// Queue where the OrchestratorService listens for "order.placed" events.
// This is the entry point for the Saga process.
public string QOrchestratorOrderPlaced { get; set; } = "orchestrator.order_placed";
// Queue where the ProductService listens for stock reservation requests.
public string QProductStockReservationRequested { get; set; } = "product.stock_reservation_requested";
// Queue where the OrchestratorService listens for "stock.reserved" events
// (successful reservation confirmation).
public string QOrchestratorStockReserved { get; set; } = "orchestrator.stock_reserved";
// Queue where the OrchestratorService listens for "stock.reservation_failed" events
// (failed stock reservation).
public string QOrchestratorStockFailed { get; set; } = "orchestrator.stock_failed";
// Queue where the NotificationService listens for "order.confirmed" events
// to send confirmation messages to customers.
public string QNotificationOrderConfirmed { get; set; } = "notification.order_confirmed";
// Queue where the NotificationService listens for "order.cancelled" events
// to send cancellation messages to customers.
public string QNotificationOrderCancelled { get; set; } = "notification.order_cancelled";
// Queue where the OrderService listens for "order.cancelled" events
// to perform compensation logic (rollback or status update).
public string QOrderCompensationCancelled { get; set; } = "order.compensation_cancelled";
}
}
Key Points:
- Stores RabbitMQ Connection and Routing Details.
- Guarantees Consistent Naming across all microservices.
- Supports DLX (Dead Letter Exchange) configuration.
- Used by both Topology and Publisher classes.
Creating RabbitMQ Topology
RabbitTopology defines how RabbitMQ is structured; it declares exchanges, queues, and bindings between them. It ensures that all required messaging infrastructure is in place before the application starts sending or receiving messages.
- Declares topic exchanges and dead-letter queues.
- Binds routing keys to queues.
- Ensures idempotent setup (safe to run multiple times).
In short, Topology is the blueprint of RabbitMQ’s setup, ensuring that all queues and exchanges are ready to handle events.
Topology/RabbitTopology.cs
This class programmatically ensures that all RabbitMQ components (Exchanges, Queues, DLQs) exist before use. It’s called during service startup to make the setup idempotent, meaning services can start in any order and still work correctly. We have already created the following RabbitTopology class within the Topology folder of the Messaging.Common project. Please update the RabbitTopology.cs file as follows.
using Messaging.Common.Options;
using RabbitMQ.Client;
namespace Messaging.Common.Topology
{
// The RabbitTopology class ensures that all required exchanges, queues,
// and bindings exist before any microservice starts publishing or consuming messages.
// It is idempotent — meaning if the objects already exist, it won’t recreate them.
public static class RabbitTopology
{
// Declares and binds all exchanges and queues required across microservices.
// This method is typically called once during service startup.
// Parameters:
// ch: An active RabbitMQ channel (IModel) used to declare exchanges and queues.
// opt: Configuration options containing exchange, queue, and routing key names
public static void EnsureAll(IModel ch, RabbitMqOptions opt)
{
// -------------------------------------------------------------
// 1. Declare Main Business Exchange
// --------------------------------------------------------------------
// This is the central topic exchange where all domain events are published.
// Services will publish or subscribe to routing keys on this exchange.
// Using 'durable: true' ensures that the exchange survives RabbitMQ restarts.
ch.ExchangeDeclare(exchange: opt.ExchangeName, type: ExchangeType.Topic, durable: true, autoDelete: false);
// -------------------------------------------------------------
// 2. Declare Dead Letter Exchange (DLX) and Queue
// --------------------------------------------------------------------
// The DLX handles messages that are rejected, expired, or failed to be processed.
// A fanout exchange broadcasts all dead messages to the DLQ (Dead Letter Queue).
ch.ExchangeDeclare(exchange: opt.DlxExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false);
// Create the DLQ (Dead Letter Queue) to store failed messages for later inspection.
ch.QueueDeclare(queue: opt.DlxQueueName, durable: true, exclusive: false, autoDelete: false);
// Bind DLQ to DLX (so that dead messages are redirected here).
ch.QueueBind(queue: opt.DlxQueueName, exchange: opt.DlxExchangeName, routingKey: "");
// -------------------------------------------------------------
// 3. Attach DLX Settings to Business Queues
// -------------------------------------------------------------
// These arguments attach the DLX to all main business queues.
// It ensures that if a consumer rejects a message, RabbitMQ automatically
// sends it to the Dead Letter Exchange (DLX) for safe storage and inspection.
var qargs = new Dictionary<string, object>
{
["x-dead-letter-exchange"] = opt.DlxExchangeName!, // Where to send after failure
["x-max-length"] = 1000, // Max messages
["x-message-ttl"] = 300000, // 5-minute lifespan
//RabbitMQ does not support maximum number of retries
};
// -------------------------------------------------------------
// 4️. Declare & Bind Queues for Each Microservice
// -------------------------------------------------------------
// ORCHESTRATOR → Listens for "order.placed" events published by OrderService.
// This is where the Saga begins. Once an order is placed, the orchestrator takes over.
ch.QueueDeclare(queue: opt.QOrchestratorOrderPlaced, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
// This means:
// Durable → queue survives broker restarts.
// Exclusive = false → multiple microservices or consumers can listen to the same queue.
// AutoDelete = false → queue stays alive until manually deleted or the broker is reset.
ch.QueueBind(queue: opt.QOrchestratorOrderPlaced, exchange: opt.ExchangeName, routingKey: opt.RkOrderPlaced);
// PRODUCT SERVICE → Listens for "stock.reservation.requested" events.
// The orchestrator requests the ProductService to reserve stock for an order.
ch.QueueDeclare(queue: opt.QProductStockReservationRequested, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QProductStockReservationRequested, exchange: opt.ExchangeName, routingKey: opt.RkStockReservationRequested);
// ORCHESTRATOR listens to "stock.reserved" events (from ProductService)
// On success, Orchestrator will confirm the order.
ch.QueueDeclare(queue: opt.QOrchestratorStockReserved, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QOrchestratorStockReserved, exchange: opt.ExchangeName, routingKey: opt.RkStockReserved);
// ORCHESTRATOR also listens to "stock.failed" events (from ProductService)
// On failure, Orchestrator will cancel the order and trigger compensation.
ch.QueueDeclare(queue: opt.QOrchestratorStockFailed, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QOrchestratorStockFailed, exchange: opt.ExchangeName, routingKey: opt.RkStockFailed);
// -------------------------------------------------------------
// 5️. Declare Notification Service Queues
// -------------------------------------------------------------
// NotificationService listens to "order.confirmed" events
// Used to send confirmation emails or SMS notifications to customers.
ch.QueueDeclare(queue: opt.QNotificationOrderConfirmed, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QNotificationOrderConfirmed, exchange: opt.ExchangeName, routingKey: opt.RkOrderConfirmed);
// NotificationService also listens to "order.cancelled" events
// Used to send cancellation alerts to customers.
ch.QueueDeclare(queue: opt.QNotificationOrderCancelled, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QNotificationOrderCancelled, exchange: opt.ExchangeName, routingKey: opt.RkOrderCancelled);
// -------------------------------------------------------------
// 6️. Declare Compensation Queue for OrderService
// -------------------------------------------------------------
// OrderService listens to "order.cancelled" events
// This ensures that failed orders are compensated in the database.
ch.QueueDeclare(queue: opt.QOrderCompensationCancelled, durable: true, exclusive: false, autoDelete: false, arguments: qargs);
ch.QueueBind(queue: opt.QOrderCompensationCancelled, exchange: opt.ExchangeName, routingKey: opt.RkOrderCancelled);
// All topology components (Exchanges, Queues, Bindings) are now ensured.
// This setup guarantees that services can publish or consume messages
// safely and consistently across the distributed Saga workflow.
}
}
}
Key Points:
- Declares all Exchanges, Queues, and Bindings.
- Configures DLX/DLQ for Message Reliability.
- Ensures Idempotent Startup across microservices.
- Prevents manual queue/exchange setup errors.
Understanding exclusive in QueueDeclare
The exclusive flag controls who can access a queue and when it gets deleted.
- exclusive: true: Queue can only be used by the connection that declared it. When that connection closes, → the queue is automatically deleted.
- exclusive: false: Queue is shared. Multiple connections or consumers (even from different apps/machines) can access it. The queue persists even if the declaring connection closes.
Creating RabbitMQ Connection Manager
ConnectionManager is a Helper Class that manages the connection between your application and the RabbitMQ server. It keeps a single, reusable connection open for efficiency, recreating it if it breaks. This avoids the performance cost of creating multiple connections.
- Open and maintain RabbitMQ connections.
- Automatically reconnect if disconnected.
- Provide a shared channel for publishing/consuming messages.
In short, Connection Manager ensures stable and efficient communication with RabbitMQ.
Connection/ConnectionManager.cs
Manages RabbitMQ connection lifecycle. It maintains a single, long-lived connection per service and automatically recreates it if it is broken. This optimizes performance by avoiding the cost of repeatedly creating connections.
We have already created the following ConnectionManager class within the Connection folder of the Messaging.Common project. Please update the ConnectionManager.cs file as follows.
using RabbitMQ.Client;
namespace Messaging.Common.Connection
{
// The ConnectionManager class is responsible for managing a single, reusable connection to the RabbitMQ broker.
// Opening a RabbitMQ connection is an expensive operation, so this class ensures that:
// Only one connection is created per application instance.
// The same connection is reused for all publishers and consumers.
// If the connection drops, it will be recreated automatically.
public class ConnectionManager
{
// ---------------------------------------------------------------------
// Private Fields
// ---------------------------------------------------------------------
// Holds a reference to the RabbitMQ connection factory.
// The ConnectionFactory is responsible for creating connections to RabbitMQ
// with the provided host, username, password, and vhost..
private readonly ConnectionFactory _factory;
// Keeps a reference to the currently active RabbitMQ connection.
// The "?" means it can be null initially (before first use).
private IConnection? _connection;
// ---------------------------------------------------------------------
// Constructor
// ---------------------------------------------------------------------
// Accepts the RabbitMQ configuration values and sets up a Connection Factory
// that can be used later to open a connection on demand.
// Parameters
// hostName: The hostname or IP address of the RabbitMQ broker.
// userName: The username used for authentication.
// password: The password for the given username.
// vhost: The RabbitMQ virtual host to connect to.
public ConnectionManager(string hostName, string userName, string password, string vhost)
{
// Create and configure the RabbitMQ connection factory
// The object that knows how to open connections to the RabbitMQ broker.
_factory = new ConnectionFactory
{
// The address (hostname or IP) of the RabbitMQ server.
HostName = hostName,
// Username for authenticating to RabbitMQ.
// This user must have permission to access the virtual host below.
UserName = userName,
// Password for the provided username.
Password = password,
// Virtual Host (vhost) acts like a namespace in RabbitMQ
// that keeps exchanges, queues, and permissions separate per environment or app.
VirtualHost = vhost,
// Enables support for asynchronous consumers instead of traditional synchronous consumers.
// Without this, consumers would process messages synchronously, blocking threads.
// This flag is essential for modern, high-performance .NET applications.
DispatchConsumersAsync = true
};
}
// ---------------------------------------------------------------------
// GetConnection Method
// ---------------------------------------------------------------------
// Returns an active RabbitMQ connection.
// If no connection exists or if the existing one is closed, a new one is created.
// This ensures that the application always has a valid connection
// without the overhead of creating new connections frequently.
public IConnection GetConnection()
{
// Check if there is no existing connection OR if it has been closed due to timeout or broker restart.
// This ensures that the app always has a valid, open connection to work with.
if (_connection == null || !_connection.IsOpen)
{
// Logically, this section only runs once or when a reconnection is needed.
// Create a new connection using the pre-configured factory.
// NOTE: Creating a connection is an expensive I/O operation — so we avoid doing it frequently.
_connection = _factory.CreateConnection();
}
// Return the current active connection (either existing or newly created).
// All publishers, consumers, and topology setup classes use this shared connection.
return _connection;
}
}
}
Key Points:
- Creates and manages Singleton RabbitMQ Connections.
- Reuses Connections for Publishers and Consumers.
- Handles Automatic Reconnection logic.
- Reduces Network and Broker Load.
Creating Base Consumer
BaseConsumer is an abstract class that defines the standard way to consume messages from a queue. It handles message subscription, deserialization, logging, and acknowledgment automatically, so developers only need to implement the business logic.
- Subscribes to a queue and listens for messages.
- Converts message JSON into a typed object.
- Acknowledges (ACK) or requeues (NACK) messages based on success/failure.
In short, Base Consumer provides a ready-to-use framework for safely and consistently reading messages from RabbitMQ.
Consuming/BaseConsumer.cs
Provides a reusable base class for all message consumers. It handles the heavy lifting: subscribing to a queue, deserializing messages, executing the handler, and automatically managing ACK/NACK. Services simply extend this and define their business logic. By making it a BackgroundService, we integrate neatly with ASP.NET Core’s hosted service model. Services only need to extend this and implement HandleMessage.
We have already created the following BaseConsumer class within the Consuming folder of the Messaging.Common project. Please update the Consuming.cs file as follows.
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Messaging.Common.Consuming
{
// BaseConsumer is an abstract, reusable class that handles all the
// repeated logic required for consuming messages from RabbitMQ queues.
// It integrates with ASP.NET Core's background service model, which allows
// consumers to start automatically when the host application runs.
// Derived classes only need to implement the HandleMessage() method
// to define their own business logic for each message.
// T: The message type expected from the queue.
public abstract class BaseConsumer<T> : BackgroundService
{
// --------------------------------------------------------------------
// Private Fields
// --------------------------------------------------------------------
// The RabbitMQ channel (IModel) used for consuming messages.
private readonly IModel _channel;
// The name of the queue that this consumer will subscribe to.
private readonly string _queue;
// Logger instance used to record informational or error messages.
// Helps track message consumption, errors, and debugging.
private readonly ILogger _logger;
// --------------------------------------------------------------------
// Constructor
// --------------------------------------------------------------------
// Initializes the BaseConsumer with a specific RabbitMQ channel, queue name, and logger instance.
// Parameters:
// channel: The RabbitMQ channel used for consuming messages.
// queueName: The name of the queue this consumer listens to.
// logger: The logging service for tracking events and errors.
protected BaseConsumer(IModel channel, string queueName, ILogger logger)
{
_channel = channel;
_queue = queueName;
_logger = logger;
}
// --------------------------------------------------------------------
// ExecuteAsync (Entry Point)
// --------------------------------------------------------------------
// The main entry point for the background service.
// Called automatically when the BackgroundService starts.
// This method sets up an asynchronous RabbitMQ consumer that listens to the specified queue.
// It automatically deserializes messages, calls the handler, and manages acknowledgments.
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine($"[CONSUMER:START] Queue={_queue}, AutoAck=false, Prefetch=1");
// Create an asynchronous event-based consumer to receive messages from the queue.
// AsyncEventingBasicConsumer allows async processing without blocking threads.
var consumer = new AsyncEventingBasicConsumer(_channel);
// ----------------------------------------------------------------
// Step 1: Configure Quality of Service (QoS)
// ----------------------------------------------------------------
// Configures how many messages the consumer can prefetch (receive) before ACKing previous ones.
// Parameters:
// - prefetchSize: 0
// → This disables size-based limiting (we don’t limit by message size in bytes).
// RabbitMQ will ignore message size and only use message count.
// - prefetchCount: 1
// → This tells RabbitMQ to deliver only one unacknowledged message at a time to this consumer.
// The consumer must ACK before the next message is sent.
// Ensures sequential and controlled message processing
// - global: false
// → This setting applies only to this channel/consumer instance.
// If set to true, it would apply globally to all consumers on the same channel.
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// The Received event is triggered every time a new message arrives in the subscribed queue.
// It provides two parameters:
//
// 1. '_' → This represents the sender object (the source that raised the event).
// The underscore (_) means "we don’t need this variable" — it’s ignored intentionally.
//
// 2. 'ea' → This is an instance of BasicDeliverEventArgs.
// It contains all the information about the received message, including:
// - ea.Body → The raw message payload (byte array)
// - ea.DeliveryTag → Unique identifier for the message (used for ACK/NACK)
// - ea.BasicProperties → Metadata (like CorrelationId, Headers, ContentType, etc.)
// - ea.RoutingKey → The routing key used to deliver the message
consumer.Received += async (_, ea) =>
{
try
{
// ------------------------------------------------------------
// Step 2: Deserialize the Message
// ------------------------------------------------------------
// The message payload (body) arrives as a byte array.
// Convert the incoming byte array into a json string
var json = Encoding.UTF8.GetString(ea.Body.Span);
// Then, deserialize the JSON string into the target message type (T).
// The "!" indicates that we're confident the deserialization won't return null.
var msg = JsonSerializer.Deserialize<T>(json)!;
// ------------------------------------------------------------
// Step 3: Invoke Business Logic
// ------------------------------------------------------------
// Delegate the business logic to the derived class implementation.
// Each derived class defines what to do when a message is received.
// We are passing the target message type and correlationId
await HandleMessage(msg);
// ------------------------------------------------------------
// Step 4: Acknowledge Success
// ------------------------------------------------------------
// Send an Acknowledges signal to RabbitMQ to confirm successful processing.
// This tells the broker that this message has been processed successfully
// and can be removed from the queue.
// Parameters:
// - ea.DeliveryTag : A unique number assigned by RabbitMQ to every delivered message.
// It helps the broker identify which specific message is being ACKed.
// - multiple: false: Means "acknowledge only this single message" (not multiple).
// If it is true, RabbitMQ would ACK all previous unacknowledged messages
// up to and including this tag. (useful for batch acking)
_channel.BasicAck(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// ------------------------------------------------------------
// Step 5: Handle Exceptions
// ------------------------------------------------------------
// If anything goes wrong during message handling,
// log the error details for troubleshooting.
_logger.LogError($"[Consumer Error] {ex.Message}. StackTrace: {ex}");
// Send a NACK (Negative Acknowledgment) to RabbitMQ.
// This tells RabbitMQ that processing failed.
// Setting requeue: true means the message will go back to the queue
// for another attempt (or be sent to the DLQ if retries exceed limits).
_channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
}
};
// ----------------------------------------------------------
// Step 6: Start Consuming Messages
// ----------------------------------------------------------
// Start consuming messages from the specified queue.
// This method registers a consumer to listen for incoming messages and handle them asynchronously.
// Parameters:
// - queue: _queue → The name of the queue that this consumer will subscribe to.
// This is the same queue name that was passed into the constructor
// when this BaseConsumer instance was created.
// - autoAck: false → "Automatic Acknowledgment" is disabled.
// This means the consumer must manually acknowledge (ACK) each message
// after successful processing using `_ch.BasicAck(...)`.
// If set to true, RabbitMQ would auto-acknowledge messages as soon as
// they are delivered — but that’s risky because if processing fails,
// the message would be lost forever.
// - consumer: consumer → The instance of AsyncEventingBasicConsumer that listens for messages.
// It handles message delivery events (`consumer.Received`) asynchronously.
// In short:
// - We’re subscribing to a specific queue (_queue),
// - We’re handling messages manually (autoAck = false),
// - We’re using our AsyncEventingBasicConsumer instance to process incoming messages.
_channel.BasicConsume(queue: _queue, autoAck: false, consumer: consumer);
// Log that the consumer has started successfully.
_logger.LogInformation("Consumer started and listening on queue: {Queue}", _queue);
// BackgroundService requires returning a Task — no continuous loop needed,
// because message handling is event-driven.
return Task.CompletedTask;
}
// --------------------------------------------------------------------
// Abstract Method: Message Handling Logic
// --------------------------------------------------------------------
// Each subclass must override this method to define its message-handling logic.
// This is where your actual business logic for processing messages goes.
// BaseConsumer handles all the RabbitMQ plumbing;
// Subclasses focus purely on business logic.
// Parameters:
// message: The deserialized message object of type T.
protected abstract Task HandleMessage(T message);
}
}
Key Points:
- Simplifies Consumer Creation via inheritance.
- Handles Message Acknowledgment (ACK/NACK).
- Provides Error Handling and Logging.
- Integrates with ASP.NET Core’s Background Service model.
Does ExecuteAsync execute multiple times?
No. ExecuteAsync runs only once per consumer instance, and ASP.NET Core creates a hosted service infrastructure when the application starts. In other words:
- ASP.NET Core’s BackgroundService calls ExecuteAsync once on startup.
- It stays alive for the lifetime of the application (until the cancellation token is triggered when the app shuts down).
- You do not manually call it; the hosting framework does.
So, for example: builder.Services.AddHostedService<MyConsumer>(); → When the app starts, the DI container creates one instance of MyConsumer, and its ExecuteAsync runs once.
How many times will consumer.Received execute?
consumer.Received is an event handler that triggers once for every message that arrives on the subscribed queue.
- If 10 messages arrive → event executes 10 times (one per message).
- If no messages arrive → it doesn’t run at all.
- If messages keep coming, it keeps executing asynchronously.
The key: consumer.Received runs multiple times, once per message, but it is registered only once in ExecuteAsync.
How many times do _ch.BasicQos() and _ch.BasicConsume() execute?
They execute exactly once per BaseConsumer<T> instance:
- BasicQos() → once per channel (per consumer instance).
- BasicConsume() → once per queue subscription (per consumer instance).
Even if thousands of messages arrive, they are handled through the same consumer — the event handler runs many times, but these setup calls don’t repeat.
Can we move _ch.BasicConsume before attaching the event handler?
Technically yes. But you should not move _ch.BasicConsume before the event handler, because RabbitMQ could start delivering messages immediately, and your handler wouldn’t yet be attached. That can cause message loss or unexpected ACK/NACK behavior.
Creating Publishers
Publishers are classes responsible for sending events to RabbitMQ exchanges. They convert .NET objects into JSON messages and publish them to the correct routing key and exchange.
- IPublisher → defines the publishing interface.
- Publisher → actual implementation that performs the JSON serialization and message sending.
In short, Publishers are the “message senders” in your event-driven system.
Publishing/IPublisher.cs
Defines the abstraction for publishing events. By depending on this interface, your application code remains decoupled from RabbitMQ’s specifics, enabling easy replacement with other message brokers or test mocks. So, create an interface named IPublisher.cs within the Publishing folder of the Messaging.Common project, and copy-paste the following code.
namespace Messaging.Common.Publishing
{
// Defines a generic contract for publishing messages to a message broker (RabbitMQ in this case).
// Using an interface ensures loose coupling — meaning your services depend on this abstraction,
// not directly on RabbitMQ. Later, you could easily replace RabbitMQ with another message broker
// (like Kafka, Azure Service Bus, etc.) without changing the rest of your code.
public interface IPublisher
{
// Publishes a message asynchronously to the given exchange with the specified routing key.
// exchange: The exchange name to which the message will be sent.
// routingKey: The routing key that determines which queue(s) receive the message..
// message: The actual message payload to publish (usually an event object =serialized to JSON).
Task PublishAsync(string exchange, string routingKey, object message);
}
}
Key Points:
- Provides an Abstraction Layer over RabbitMQ.
- Enables Flexible Testing and Swapping of message systems.
- Defines the Contract for Event Publishing.
- Promotes Clean Architecture Principles.
Publishing/Publisher.cs
Implements the actual publishing logic. It serializes messages into JSON, sets delivery properties (persistent, content type, correlation ID), and publishes to the exchange with the correct routing key. We have already created the Publisher.cs class file within the Publishing folder of the Messaging.Common project. So, please update the Publisher.cs class file as follows:
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
namespace Messaging.Common.Publishing
{
// The default implementation of IPublisher that sends messages to RabbitMQ.
// Responsibilities:
// - Serialize message objects to JSON.
// - Set message properties (like persistence, content type, correlation ID).
// - Publish the message to the specified exchange and routing key.
public sealed class Publisher : IPublisher
{
// ---------------------------------------------------------------------
// Private Fields
// ---------------------------------------------------------------------
// Represents a RabbitMQ channel (IModel) used for communication with the broker.
// It’s used for performing operations (declare, publish, consume, etc.).
private readonly IModel _channel;
// ---------------------------------------------------------------------
// Constructor
// ---------------------------------------------------------------------
// Initializes the publisher with an active RabbitMQ channel.
// The channel is injected via dependency injection (registered in DI container).
// Parameter:
// channel: An open RabbitMQ channel used for publishing messages.
public Publisher(IModel channel)
{
_channel = channel;
}
// ---------------------------------------------------------------------
// PublishAsync Method
// ---------------------------------------------------------------------
// Publishes a message to RabbitMQ using the provided exchange and routing key.
// exchange: The target exchange to publish the message to.
// routingKey: The routing key that determines which queue(s) will receive the message.
// message: The actual object (event DTO) to send.
// correlationId: Optional unique ID for tracing message flow across services.
public Task PublishAsync(string exchange, string routingKey, object message)
{
try
{
// ----------------------------------------------------------
// Step 1: Serialize the message payload to JSON format
// ----------------------------------------------------------
// Converts the .NET object (e.g., OrderPlacedEvent) into a JSON string
// so it can be sent over RabbitMQ (which only transmits byte arrays).
var json = JsonSerializer.Serialize(message);
// Convert the JSON string into a UTF-8 byte array (RabbitMQ requires binary payloads).
var body = Encoding.UTF8.GetBytes(json);
// ----------------------------------------------------------
// Step 2: Create and configure basic message properties
// ----------------------------------------------------------
// Properties allow you to set metadata like content type, persistence, correlation ID, etc.
var props = _channel.CreateBasicProperties();
// Specify that the message content is in JSON format.
props.ContentType = "application/json";
// Make the message persistent (DeliveryMode = 2).
// Persistent messages are stored on disk by RabbitMQ,
// so they survive broker restarts (as long as the queue is durable too).
props.DeliveryMode = 2;
// ----------------------------------------------------------
// Step 3: Publish the message to RabbitMQ
// ----------------------------------------------------------
// Sends the serialized message to the target exchange using the routing key.
// The routing key determines which queue(s) the message will reach.
// Parameters:
// - exchange: Name of the exchange (topic, direct, etc.)
// - routingKey: Used by RabbitMQ to route to the correct queue(s)
// - basicProperties: Metadata attached to the message (props)
// - body: Actual message data in bytes
Console.WriteLine($"[PUBLISH:send] -> Exchange={exchange}, body={json}");
_channel.BasicPublish(
exchange: exchange,
routingKey: routingKey,
basicProperties: props,
body: body
);
//What happens when we call _ch.BasicPublish
// This sends the message to RabbitMQ immediately using the open channel.
// The method BasicPublish() from the RabbitMQ .NET client does not return a Task
// or wait for confirmation that the broker received the message,
// it just pushes it to the TCP connection and returns instantly.
// So from.NET’s point of view, the publishing operation is completed the moment that call returns,
// even though the broker might still be processing it internally.
// In short: There is no asynchronous work(no await, no background operation) happening after that line.
Console.WriteLine($"[PUBLISH:ok] Exchange={exchange}, RK={routingKey}, Type={message?.GetType().Name ?? "(null)"}");
}
catch (Exception ex)
{
Console.WriteLine($"[PUBLISH:error] {ex.GetType().Name}: {ex.Message}");
throw;
}
// ----------------------------------------------------------
// Step 4: Complete the Task
// ----------------------------------------------------------
// The method doesn’t await any I/O, so we return a completed task.
// This method is already done — return a completed Task to satisfy the async method signature.
return Task.CompletedTask;
}
}
}
Key Points:
- Implements Real RabbitMQ Publish Flow.
- Ensures Message Persistence.
- Supports Tracing via CorrelationId.
- Guarantees Standardized Message Format (JSON).
Creating Extensions
Extensions (like ServiceCollectionExtensions) are helper methods that simplify dependency injection setup in ASP.NET Core. They make it easy to register all RabbitMQ-related services (ConnectionManager, IConnection, and IModel) into the DI container with a single line of code.
Example: services.AddRabbitMq(“localhost”, “user”, “password”, “vhost”);
In short: Extensions help you plug RabbitMQ setup into any microservice quickly and cleanly.
Extensions/ServiceCollectionExtensions.cs
Provides an easy way to register all RabbitMQ-related services (ConnectionManager, IConnection, IModel) into the ASP.NET Core DI container using a single extension method AddRabbitMq(…). This avoids repetitive setup in each microservice. We have already created the ServiceCollectionExtensions.cs class file within the Extensions folder of the Messaging.Common project. So, please update the ServiceCollectionExtensions.cs class file as follows:
using Microsoft.Extensions.DependencyInjection;
using Messaging.Common.Connection;
namespace Messaging.Common.Extensions
{
// Provides an extension method for IServiceCollection that simplify the registration
// of RabbitMQ connection-related services (ConnectionManager, IConnection, and IModel)
// into the ASP.NET Core Dependency Injection (DI) container.
// Purpose:
// - Centralizes RabbitMQ setup logic in one reusable place.
// - Keeps Program.cs clean and consistent across microservices.
// - Allows easy injection of RabbitMQ dependencies into any class (publisher, consumer, topology initializer, etc.).
public static class ServiceCollectionExtensions
{
// Adds and configures RabbitMQ connection services in the DI container.
// Once registered, you can inject:
// - ConnectionManager : For managing the connection lifecycle
// - IConnection : The shared RabbitMQ connection
// - IModel : A lightweight channel used for publishing and consuming messages
// Parameters:
// services: The IServiceCollection instance being extended.
// hostName: RabbitMQ server host or IP (e.g., "localhost").
// userName: Username for RabbitMQ authentication.
// password: Password for the given username.
// vhost: The RabbitMQ Virtual Host (vhost) to connect to.
// Returns:
// The same IServiceCollection for fluent configuration chaining.
public static IServiceCollection AddRabbitMq(
this IServiceCollection services, // "this" makes it an extension method usable as services.AddRabbitMq(...)
string hostName, // Hostname or server address where RabbitMQ is running
string userName, // RabbitMQ login username
string password, // RabbitMQ login password
string vhost) // Virtual host name (used for logical isolation)
{
// --------------------------------------------------------------------
// Step 1: Create and Configure the ConnectionManager
// --------------------------------------------------------------------
// The ConnectionManager is our custom helper class that manages RabbitMQ connections.
// It handles creating and reusing a single long-lived connection to the RabbitMQ broker.
var connectionManager = new ConnectionManager(hostName, userName, password, vhost);
// --------------------------------------------------------------------
// Step 2: Establish a Connection to RabbitMQ
// --------------------------------------------------------------------
// Get an active RabbitMQ connection.
// If a connection doesn't exist or is closed, ConnectionManager will create a new one.
// Creating a new connection is expensive (opens a connection to the broker),
// so we keep it alive and reuse it as long as the application is running.
var connection = connectionManager.GetConnection();
// --------------------------------------------------------------------
// Step 3: Create a Channel (IModel)
// --------------------------------------------------------------------
// Channels are used for declaring exchanges/queues, publishing messages, and consuming messages.
// In this setup, we create one shared channel per application instance
// that can be injected into any class (like Publisher or BaseConsumer).
var channel = connection.CreateModel();
// --------------------------------------------------------------------
// Step 4: Register Components in the Dependency Injection (DI) Container
// --------------------------------------------------------------------
// Register each RabbitMQ-related service as a Singleton.
// Singleton means one instance will be created and reused for the entire application lifetime.
// Register ConnectionManager as a Singleton
// Ensures only one instance manages the connection lifecycle for the entire app.
services.AddSingleton(connectionManager);
// Register the RabbitMQ IConnection object.
// This represents the active TCP connection to RabbitMQ.
// We register it as a Singleton because it’s resource-intensive to create.
services.AddSingleton(connection);
// Register the IModel (channel)
// The channel is used by publishers and consumers to send/receive messages.
// Also registered as a Singleton for reuse
services.AddSingleton(channel);
// --------------------------------------------------------------------
// Step 5: Return the Service Collection
// --------------------------------------------------------------------
// Returning IServiceCollection allows fluent method chaining, for example:
// services.AddRabbitMq(...).AddSingleton<IMyService, MyService>();
return services;
}
}
}
Key Points:
- Simplifies DI Registration of messaging components.
- Promotes Code Reuse across services.
- Ensures Consistent RabbitMQ Initialization.
- Makes setup Clean, Modular, and Testable.
In summary, the Messaging.Common infrastructure acts as the foundation for seamless, event-driven communication among microservices. Each class within it has a dedicated role, from defining consistent event models and managing RabbitMQ connections to publishing, consuming, and binding messages through a standardized topology. Together, they enable the system to remain modular, fault-tolerant, and easy to extend, ensuring that future services can plug into the messaging workflow effortlessly while maintaining high reliability and traceability.

