Back to: Microservices using ASP.NET Core Web API Tutorials
Implementing Orchestrator Service for Saga Pattern
The Orchestrator Service plays a central role in managing distributed transactions across multiple microservices using the Saga Pattern. It coordinates the flow between services such as Order, Product, Payment, and Notification by listening to specific events, maintaining temporary state, and publishing follow-up events. This ensures that the entire transaction either completes successfully or compensates properly in case of failure.
Saga Flow for Order Placement (with Orchestrator)
The following flow explains how the OrderService, OrchestratorService, ProductService, and NotificationService coordinate using events to ensure data consistency across all microservices during an order’s lifecycle.
Step 1 – Customer Places an Order
- A customer submits an order through the OrderService API (e.g., POST /api/orders).
- The OrderService validates the request (user, product availability, address, payment method, etc.).
- Once validation passes, it saves the order in its database with the initial status = Pending or Confirmed.
- After successfully saving the order, the service publishes an OrderPlacedEvent to RabbitMQ. This event contains all necessary details:
-
- OrderId, UserId, OrderNumber
- List of ordered items (ProductId, Quantity, Price)
- TotalAmount, CustomerName, Email, PhoneNumber
-
- The event is sent to the Shared Exchange (e.g., “ecommerce.topic”) with routing key “order.placed”, which notifies the Orchestrator Service that a new order has been created.
Purpose: At this point, the Saga is triggered, and the distributed transaction begins. The OrderService’s role is complete for now, and control moves to the OrchestratorService.
Step 2 – Orchestrator Receives the OrderPlacedEvent
- The OrchestratorService has a consumer (OrderPlacedConsumer) subscribed to the queue bound with the routing key “order.placed”.
- When the OrderPlacedEvent message arrives, the consumer forwards it to the OrchestrationService.OnOrderPlacedAsync() for processing.
- Inside this OnOrderPlacedAsync method, the Orchestrator performs two key tasks:
-
- Caches the order details temporarily in memory or a distributed cache (so it can be accessed during later stages of the Saga).
- Publishes a StockReservationRequestedEvent to the RabbitMQ exchange with routing key “stock.reserve.requested”, asking the ProductService to verify and reserve stock.
-
Purpose: The Orchestrator now takes charge of coordinating the transaction; it becomes the “director” of the Saga, ensuring that all services perform their parts in order.
Step 3 – ProductService Tries to Reserve Stock
- The ProductService has a consumer (StockReservationRequestedConsumer) subscribed to the “stock.reserve.requested” queue.
- When it receives the event, it:
-
- Looks up the products in its Product Database.
- Checks whether each product has sufficient stock quantity.
-
- Based on the result, it performs one of the following actions:
-
- If stock is available:
-
- It temporarily reduces (reserves) the stock.
- Publishes a StockReservedEvent to notify success.
-
- If stock is insufficient:
-
- It publishes a StockReservationFailedEvent with the reason (e.g., “Product out of stock”).
-
- If stock is available:
-
Purpose: ProductService decides whether the Saga can continue successfully (stock reserved) or must perform compensation (order cancellation).
Step 4 – Orchestrator Reacts to ProductService Response
Now the Orchestrator waits for one of two possible responses: StockReservedEvent or StockReservationFailedEvent.
Case A: Stock Reserved Successfully
- The OrchestratorService receives the StockReservedEvent via its StockReservedConsumer.
- It calls the OnStockReservedAsync() method in OrchestrationService.
- Inside this method, the Orchestrator:
-
- Logs the stock reservation as successful.
- Publishes an OrderConfirmedEvent to RabbitMQ with routing key “order.confirmed”.
- Removes the cached order data since the process is now complete.
-
Purpose: This path represents the successful completion of the Saga, the order is confirmed, and other services (like NotificationService) can now proceed.
Case B: Stock Reservation Failed
- The OrchestratorService receives the StockReservationFailedEvent via StockReservationFailedConsumer.
- It calls the OnStockReservationFailedAsync() method in OrchestrationService.
- Inside this method, the Orchestrator:
-
- Logs the failure reason.
- Publishes an OrderCancelledEvent with routing key “order.cancelled”, passing the OrderId and the cancellation reason (e.g., “Stock unavailable”).
- Removes the cached order data since the process has ended.
-
Purpose: This is the Compensation Path of the Saga. It ensures that an incomplete transaction is rolled back cleanly.
Step 5 – OrderService Reacts to Final Outcome
- The OrderService has a consumer (OrderCancelledConsumer) listening for the OrderCancelledEvent.
- When it receives this message, it calls the application-level handler OrderCancelledHandler.
- The handler executes compensation logic:
-
- Updates the order status in the Order database → from Pending or Confirmed to “Cancelled”.
- Logs the cancellation reason for tracking and debugging.
-
Purpose: This ensures that the OrderService’s database stays consistent with the overall Saga outcome.
Step 6 – NotificationService Sends Final Message to User
- The NotificationService also subscribes to both OrderConfirmedEvent and OrderCancelledEvent.
- Depending on which event it receives:
-
- If OrderConfirmedEvent: Sends a confirmation email/SMS to the customer.
- If OrderCancelledEvent: Sends a cancellation or failure message to the customer.
-
Purpose: The NotificationService ensures the user is informed of the order’s final status, completing the user-facing part of the Saga.
Creating Orchestrator Service Projects:
First, create a solution folder named OrchestratorService. Then, within this solution folder, please add the following four projects.
- OrchestratorService.Contracts – Class Library (.NET 8)
- OrchestratorService.Application – Class Library (.NET 8)
- OrchestratorService.Infrastructure – Class Library (.NET 8)
- OrchestratorService.API – ASP.NET Core Web API (.NET 8)
OrchestratorService.Contracts (Class Library)
The Contracts layer defines the Interfaces and Messaging Contracts that act as communication boundaries between different parts of the Orchestrator and other microservices. It ensures all services speak the same language when exchanging messages or calling orchestration logic.
Uses:
- Declares interfaces like IOrchestrationService and IOrderEventsPublisher.
- Defines the Structure of Events and Messages used in the Saga (e.g., OrderPlacedEvent, StockReservedCompletedEvent).
- Enables loose coupling between layers; consumers or publishers depend on contracts, not implementations.
First, add a project reference to Messaging.Common project. Then, add a folder named Messaging within the OrchestratorService.Contracts layer project.
Messaging/IOrchestrationService.cs
This interface defines the decision-making contract for your Saga Orchestrator. It lists the three key entry points: when an order is placed, when stock is reserved, and when stock reservation fails. It ensures that orchestration logic follows a predictable, testable structure. So, create an interface named IOrchestrationService.cs within the Messaging folder of the OrchestratorService.Contracts project, and copy-paste the following code.
using Messaging.Common.Events;
namespace OrchestratorService.Contracts.Messaging
{
// Purpose:
// Defines the core decision-making contract for the OrchestratorService
// in a Saga-based distributed transaction.
//
// The Orchestrator coordinates between multiple microservices
// (OrderService, ProductService, PaymentService, NotificationService, etc.)
// to ensure that all services reach a consistent outcome for a given order.
//
// Each method below represents a "decision point" or event reaction
// in the overall Saga workflow.
//
// Design Notes:
// - OrchestratorService.Application.Services.OrchestrationService implements this interface.
// - Event consumers (OrderPlacedConsumer, StockReservedConsumer, etc.)
// call these methods whenever their respective event is received.
public interface IOrchestrationService
{
// Method: OnOrderPlacedAsync
// Description:
// Handles the start of the Saga workflow when a new order is created.
// Triggered when the OrderService publishes an OrderPlacedEvent.
//
// Behavior:
// 1. Validate and cache the received OrderPlacedEvent.
// 2. Publish a StockReservationRequestedEvent to the ProductService.
// 3. Wait for a response (StockReserved or StockReservationFailed).
//
// Parameters:
// evt → The OrderPlacedEvent containing order and customer details.
//
// Outcome:
// Starts the distributed transaction by requesting inventory reservation.
Task OnOrderPlacedAsync(OrderPlacedEvent evt);
// Method: OnStockReservedAsync
// Description:
// Handles the happy (success) path of the Saga.
// Triggered when ProductService successfully reserves stock
// and publishes a StockReservedCompletedEvent.
//
// Behavior:
// 1. Retrieve cached order details (from OnOrderPlacedAsync).
// 2. Publish an OrderConfirmedEvent so:
// - OrderService updates order status to Confirmed.
// - NotificationService sends confirmation to the customer.
// 3. Remove cached order data (since the Saga completed successfully).
//
// Parameters:
// evt → The StockReservedCompletedEvent containing order ID and reserved items.
//
// Outcome:
// Saga completes successfully (order confirmed, notifications sent).
Task OnStockReservedAsync(StockReservedCompletedEvent evt);
// Method: OnStockReservationFailedAsync
// Description:
// Handles the failure (compensation) path of the Saga.
// Triggered when ProductService cannot reserve stock and
// publishes a StockReservationFailedEvent.
//
// Behavior:
// 1. Retrieve cached order details (from OnOrderPlacedAsync).
// 2. Publish an OrderCancelledEvent so:
// - OrderService compensates by marking order as Cancelled.
// - NotificationService informs the customer of the failure.
// 3. Remove cached order data (since the Saga is finalized).
//
// Parameters:
// evt → The StockReservationFailedEvent containing order ID,
// reason for failure, and failed item details.
//
// Outcome:
// Saga ends with compensation (order cancelled gracefully).
Task OnStockReservationFailedAsync(StockReservationFailedEvent evt);
}
}
Needs / Uses:
- Defines how to handle OrderPlacedEvent, StockReservedCompletedEvent, and StockReservationFailedEvent.
- Provides a clear contract for the application layer to implement Saga logic.
- Keeps orchestration logic independent of infrastructure or message brokers
- Makes it easier to extend orchestration logic for future events (e.g., payment or shipping).
- Acts as the Primary Communication Point between event consumers and the orchestrator.
Messaging/IOrderEventsPublisher.cs
This interface abstracts the logic for publishing Saga events to RabbitMQ. It hides the messaging complexity and lets the application layer publish events such as StockReservationRequestedEvent, OrderConfirmedEvent, and OrderCancelledEvent without needing to know RabbitMQ details. Create an interface named IOrderEventsPublisher.cs within the Messaging folder, and copy-paste the following code.
using Messaging.Common.Events;
namespace OrchestratorService.Contracts.Messaging
{
// Purpose:
// Defines the abstraction for publishing Saga-related events
// from the OrchestratorService to RabbitMQ.
//
// Design Notes:
// - Concrete implementation resides in OrchestratorService.Infrastructure.Messaging.
public interface IOrderEventsPublisher
{
// Method: PublishStockReservationRequestedAsync
// Description:
// Publishes a "StockReservationRequestedEvent" message
// to ask ProductService to check and reserve inventory
// for the given order.
//
// When Called:
// - Immediately after the Orchestrator consumes an OrderPlacedEvent
// from the OrderService.
//
// Role in Saga Flow:
// This marks the transition from the "order created" step
// to the "inventory check" step in the Saga.
//
// Downstream Consumer:
// - ProductService → consumes this event and attempts to reserve stock.
//
// Parameters:
// evt → Contains details such as OrderId, UserId, and Items to reserve.
//
// Outcome:
// Begins the stock reservation phase of the distributed transaction.
Task PublishStockReservationRequestedAsync(StockReservationRequestedEvent evt);
// Method: PublishOrderConfirmedAsync
// Description:
// Publishes an "OrderConfirmedEvent" indicating that
// the order has been successfully processed — stock
// reserved, and no compensation required.
//
// When Called:
// - After the Orchestrator receives a StockReservedCompletedEvent
// from ProductService.
//
// Role in Saga Flow:
// This marks the "success path" of the Saga.
//
// Downstream Consumers:
// - NotificationService → sends confirmation email/SMS to the customer.
//
// Parameters:
// evt → Includes full order details such as OrderId, Customer info,
// and reserved item list.
//
// Outcome:
// Completes the Saga successfully — the order is confirmed.
Task PublishOrderConfirmedAsync(OrderConfirmedEvent evt);
// Method: PublishOrderCancelledAsync
// Description:
// Publishes an "OrderCancelledEvent" indicating that
// the order has failed due to stock unavailability or
// another orchestration rule (e.g., payment or validation failure).
//
// When Called:
// - After the Orchestrator receives a StockReservationFailedEvent
// from ProductService.
//
// Role in Saga Flow:
// This marks the "compensation path" of the Saga.
//
// Downstream Consumers:
// - OrderService → compensates by marking the order as Cancelled.
// - NotificationService → informs the customer about the failure reason.
//
// Parameters:
// evt → Includes OrderId, Customer info, list of failed items,
// and the failure Reason (e.g., "Insufficient Stock").
//
// Outcome:
// Saga completes with compensation — order cancelled.
Task PublishOrderCancelledAsync(OrderCancelledEvent evt);
}
}
Needs / Uses:
- Defines specific publish methods for each Saga stage.
- Publishes StockReservationRequestedEvent → triggers stock checking by ProductService.
- Publish OrderConfirmedEvent → informs downstream services of successful completion.
- Publishes OrderCancelledEvent → initiates compensation after failure.
- Keeps RabbitMQ routing logic outside the application core.
OrchestratorService.Application (Class Library)
The Application layer contains the Core Orchestration Logic that decides what to do when specific events are received. It coordinates the Saga workflow, starting the process, handling success or failure, and publishing the next events accordingly.
Uses:
- Implements the Business Rules of the Orchestrator (e.g., confirm or cancel order).
- Uses IMemoryCache to store Saga state (like order details) temporarily.
- Calls publisher interfaces to raise new Saga events.
- Ensures Transactional Flow and proper cleanup after each operation.
- Keeps business logic independent from messaging and infrastructure concerns.
Add project references to Messaging.Common, OrchestratorService.Contract and OrchestratorService.Infrastructure projects. Then, create a folder named Services within the OrchestratorService.Application layer project. Also, install the following Package:
- Install-Package Microsoft.Extensions.Caching.Memory
Services/OrchestrationService.cs
This class is the heart of the Orchestrator. It drives the Saga workflow by reacting to incoming events and deciding which event to publish next. It temporarily caches order details to maintain Saga state and ensures smooth progression or compensation depending on success or failure. Create a class file named OrchestrationService.cs within the Services folder of OrchestratorService.Application project, and copy-paste the following code.
using Messaging.Common.Events;
using Messaging.Common.Models;
using Microsoft.Extensions.Caching.Memory;
using OrchestratorService.Contracts.Messaging;
namespace OrchestratorService.Application.Services
{
// Purpose:
// This is the core orchestrator that drives the Saga workflow
// for order processing in a distributed microservices system.
//
// It listens (indirectly, via consumers) to key events like:
// - OrderPlacedEvent
// - StockReservedCompletedEvent
// - StockReservationFailedEvent
//
// and then decides what to do next — whether to:
// Confirm the order, or
// Cancel the order (compensate)
//
// Responsibilities:
// 1. Maintain temporary state (cached order details).
// 2. Publish the next appropriate event in the Saga.
// 3. Clean up after the Saga flow finishes for an order.
//
// Note:
// - No database is used here — state is transient, stored in memory.
// - In real production environments, use a distributed cache (e.g., Redis)
// to ensure durability across orchestrator restarts.
public sealed class OrchestrationService : IOrchestrationService
{
// --------------------------------------------------------
// Dependencies:
// --------------------------------------------------------
// Abstraction for publishing events to RabbitMQ.
private readonly IOrderEventsPublisher _publisher;
// In-memory cache used to store intermediate order states.
// Stores OrderPlacedEvent data temporarily between Saga steps.
private readonly IMemoryCache _cache;
// Constructor: injects publisher and memory cache.
public OrchestrationService(IOrderEventsPublisher publisher, IMemoryCache cache)
{
_publisher = publisher;
_cache = cache;
}
// --------------------------------------------------------
// Method: OnOrderPlacedAsync
// Description:
// Triggered when OrderService publishes an OrderPlacedEvent.
// This method marks the beginning of the Saga workflow.
//
// Responsibilities:
// 1. Cache the full order details temporarily
// (so they can be reused when stock results arrive).
// 2. Publish a StockReservationRequestedEvent to ProductService
// asking it to verify and reserve inventory.
//
// Parameters:
// evt → OrderPlacedEvent (contains OrderId, UserId, Items, etc.)
//
// Next Step in Saga:
// ProductService receives the request and publishes either:
// - StockReservedCompletedEvent (success) OR
// - StockReservationFailedEvent (failure)
// --------------------------------------------------------
public Task OnOrderPlacedAsync(OrderPlacedEvent evt)
{
// Cache the order for 30 minutes (temporary Saga state).
// This helps future events (e.g., customer details in confirmation/cancellation).
_cache.Set(CacheKey(evt.OrderId), evt, new MemoryCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(30)
});
// Build and publish a stock reservation request event.
// ProductService will listen for this and try to reserve stock.
var request = new StockReservationRequestedEvent
{
CorrelationId = evt.CorrelationId, // links all Saga events together
OrderId = evt.OrderId,
UserId = evt.UserId,
Items = evt.Items.ToList() // list of order line items
};
// Publish to the message broker (RabbitMQ).
return _publisher.PublishStockReservationRequestedAsync(request);
}
// --------------------------------------------------------
// Method: OnStockReservedAsync
// Description:
// Triggered when ProductService successfully reserves stock.
//
// Responsibilities:
// 1. Retrieve cached order details from memory.
// 2. Build and publish an OrderConfirmedEvent.
// - OrderService → updates DB status to Confirmed.
// - NotificationService → sends confirmation message.
// 3. Clear cache since this Saga instance is complete.
//
// Parameters:
// evt → StockReservedCompletedEvent (contains OrderId and reserved item info)
//
// Next Step in Saga:
// OrderService & NotificationService react to OrderConfirmedEvent.
// --------------------------------------------------------
public async Task OnStockReservedAsync(StockReservedCompletedEvent evt)
{
// Try to retrieve the cached order details using OrderId as key.
if (_cache.TryGetValue(CacheKey(evt.OrderId), out OrderPlacedEvent? placed) && placed is not null)
{
// Build OrderConfirmedEvent using both cached and incoming event data.
var confirmed = new OrderConfirmedEvent
{
CorrelationId = evt.CorrelationId,
OrderId = placed.OrderId,
UserId = placed.UserId,
OrderNumber = placed.OrderNumber,
CustomerName = placed.CustomerName,
CustomerEmail = placed.CustomerEmail,
PhoneNumber = placed.PhoneNumber,
TotalAmount = placed.TotalAmount,
Items = placed.Items.ToList() // reuse order item details
};
// Publish success message → Saga completes successfully.
await _publisher.PublishOrderConfirmedAsync(confirmed);
// Remove cached order (Saga instance complete).
_cache.Remove(CacheKey(evt.OrderId));
}
}
// --------------------------------------------------------
// Method: OnStockReservationFailedAsync
// Description:
// Triggered when ProductService fails to reserve stock.
// This represents the "compensation" path in the Saga.
//
// Responsibilities:
// 1. Retrieve cached order details.
// 2. Build and publish an OrderCancelledEvent (with reason + failed items).
// - OrderService → marks order as Cancelled.
// - NotificationService → notifies user of failure.
// 3. Clear cache once compensation is complete.
//
// Parameters:
// evt → StockReservationFailedEvent (contains failure reason and failed items)
//
// Next Step in Saga:
// OrderService & NotificationService react to OrderCancelledEvent.
// --------------------------------------------------------
public async Task OnStockReservationFailedAsync(StockReservationFailedEvent evt)
{
// Try to retrieve cached order info.
if (_cache.TryGetValue(CacheKey(evt.OrderId), out OrderPlacedEvent? placed) && placed is not null)
{
// Build OrderCancelledEvent with detailed failure info.
var cancelled = new OrderCancelledEvent
{
CorrelationId = evt.CorrelationId,
OrderId = placed.OrderId,
UserId = placed.UserId,
OrderNumber = placed.OrderNumber,
CustomerName = placed.CustomerName,
CustomerEmail = placed.CustomerEmail,
PhoneNumber = placed.PhoneNumber,
TotalAmount = placed.TotalAmount,
// Map FailedItems (from ProductService) into shared model for cancellation event.
Items = evt.FailedItems?.Select(i => new FailedLineItem
{
ProductId = i.ProductId,
Requested = i.Requested,
Available = i.Available,
Reason = i.Reason
}).ToList() ?? new List<FailedLineItem>(),
Reason = evt.Reason // e.g., "Insufficient stock" or "Out of sync inventory"
};
// Publish compensation event → triggers order cancellation.
await _publisher.PublishOrderCancelledAsync(cancelled);
// Remove cached order (Saga instance complete).
_cache.Remove(CacheKey(evt.OrderId));
}
}
// --------------------------------------------------------
// Helper: CacheKey
// Description:
// Generates a unique key for each cached order.
// Prevents overlap when multiple orders are processed simultaneously.
// --------------------------------------------------------
private static string CacheKey(Guid orderId) => $"order-placed:{orderId}";
}
}
Needs / Uses:
- Caches OrderPlacedEvent data temporarily using IMemoryCache (for later steps).
- Publishes StockReservationRequestedEvent when an order is placed.
- Handles success path: publishes OrderConfirmedEvent after stock reservation.
- Handles the failure path: publishes an OrderCancelledEvent when the stock reservation fails.
- Cleans up cache after Saga completion.
OrchestratorService.Infrastructure (Class Library)
The Infrastructure layer manages all External Integrations and Background Operations, especially with RabbitMQ. It implements consumers that listen for incoming events and publishers that send messages, as well as extension methods for dependency registration.
Uses:
- Implements Event Consumers for listening to RabbitMQ queues.
- Implements Event Publishers to send new messages to the exchange.
- Registers all messaging components through Extension Methods.
- Handles Message Serialization, Routing Keys, and Bindings.
First, add project references to Messaging.Common and OrchestratorService.Contract projects. Then, create a folder named Messaging and within the Messaging folder, create another folder named Producers.
Messaging/Producers/OrderEventsPublisher.cs
Implements the IOrderEventsPublisher interface and provides the Actual RabbitMQ Publishing Logic. It uses configuration options and the shared IPublisher abstraction to send messages to the correct exchange and routing keys. Create a class file named OrderEventsPublisher.cs within the Messaging/Producers folder of OrchestratorService.Infrastructure project, and copy-paste the following code.
using Messaging.Common.Events;
using Messaging.Common.Options;
using Messaging.Common.Publishing;
using Microsoft.Extensions.Options;
using OrchestratorService.Contracts.Messaging;
namespace OrchestratorService.Infrastructure.Messaging.Producers
{
// Purpose:
// Implements the IOrderEventsPublisher contract.
// This class acts as the Orchestrator’s communication channel
// to publish Saga-related events into RabbitMQ.
//
// The OrchestratorService uses this publisher to trigger the
// next steps in the distributed transaction flow:
// 1️. OrderPlaced → StockReservationRequested
// 2️. StockReserved → OrderConfirmed
// 3️. StockReservationFailed → OrderCancelled
public sealed class OrderEventsPublisher : IOrderEventsPublisher
{
// --------------------------------------------------------
// Dependencies:
// --------------------------------------------------------
// IPublisher is a shared abstraction responsible for publishing the event
private readonly IPublisher _publisher;
// Holds RabbitMQ configuration values from appsettings.json,
// including exchange name and routing keys.
private readonly RabbitMqOptions _options;
// --------------------------------------------------------
// Constructor
// Parameters:
// - publisher: shared messaging abstraction.
// - options: typed RabbitMQ configuration.
// --------------------------------------------------------
public OrderEventsPublisher(IPublisher publisher, IOptions<RabbitMqOptions> options)
{
_publisher = publisher;
_options = options.Value; // Extracts actual RabbitMQ config
}
// --------------------------------------------------------
// Method: PublishStockReservationRequestedAsync
// Description:
// Publishes a StockReservationRequestedEvent to ProductService.
// This is the *next step* after Orchestrator receives an
// OrderPlacedEvent from OrderService.
//
// Purpose:
// Requests ProductService to check inventory and reserve stock
// for the given order items.
//
// RabbitMQ Routing:
// - Exchange: ecommerce.topic
// - Routing Key: stock.reservation.requested
//
// Downstream Consumer:
// - ProductService → consumes event → checks stock availability.
//
// Outcome:
// Starts the stock validation phase of the Saga.
// --------------------------------------------------------
public Task PublishStockReservationRequestedAsync(StockReservationRequestedEvent evt)
{
// Optional developer log for visibility (during local testing).
Console.WriteLine($"[Publish] → StockReservationRequestedEvent for OrderId={evt.OrderId}");
// Publish message to the configured exchange & routing key.
return _publisher.PublishAsync(
_options.ExchangeName,
_options.RkStockReservationRequested, // e.g. "stock.reservation.requested"
evt
);
}
// --------------------------------------------------------
// Method: PublishOrderConfirmedAsync
// Description:
// Publishes an OrderConfirmedEvent when ProductService
// successfully reserves all requested stock items.
//
// Purpose:
// Informs downstream services that the order passed all
// validations and can now be marked as "Confirmed."
//
// RabbitMQ Routing:
// - Exchange: ecommerce.topic
// - Routing Key: order.confirmed
//
// Downstream Consumers:
// - OrderService → updates order status in DB to Confirmed.
// - NotificationService → sends success email/SMS to customer.
//
// Outcome:
// Completes the Saga successfully (happy path).
// --------------------------------------------------------
public Task PublishOrderConfirmedAsync(OrderConfirmedEvent evt)
{
Console.WriteLine($"[Publish] → OrderConfirmedEvent for OrderId={evt.OrderId}");
return _publisher.PublishAsync(
_options.ExchangeName,
_options.RkOrderConfirmed, // e.g. "order.confirmed"
evt
);
}
// --------------------------------------------------------
// Method: PublishOrderCancelledAsync
// Description:
// Publishes an OrderCancelledEvent when ProductService fails
// to reserve stock or an orchestration rule triggers compensation.
//
// Purpose:
// Notifies downstream services that the order cannot be completed
// and must be marked as Cancelled.
//
// RabbitMQ Routing:
// - Exchange: ecommerce.topic
// - Routing Key: order.cancelled
//
// Downstream Consumers:
// - OrderService → compensates (updates DB status to Cancelled).
// - NotificationService → alerts the customer with the reason.
//
// Outcome:
// Ends the Saga in compensation mode (failure path).
// --------------------------------------------------------
public Task PublishOrderCancelledAsync(OrderCancelledEvent evt)
{
Console.WriteLine($"[Publish] → OrderCancelledEvent for OrderId={evt.OrderId}");
return _publisher.PublishAsync(
_options.ExchangeName,
_options.RkOrderCancelled, // e.g. "order.cancelled"
evt
);
}
}
}
Needs / Uses:
- Implements Real Message Publishing for Saga events using RabbitMQ configuration.
- Handles JSON serialization, exchange selection, and routing key assignment.
- Decouples message publishing from business logic.
Messaging/Consumers/OrderPlacedConsumer.cs
Listens to the OrderPlacedEvent from the OrderService. When triggered, it starts the Saga by calling OnOrderPlacedAsync() in the OrchestrationService, which in turn requests a stock reservation from the ProductService.
Create a folder named Consumers within the Messaging folder of the OrchestratorService.Infrastructure project. Then, create a class file named OrderPlacedConsumer.cs within the Messaging/Consumers folder, and copy-paste the following code. Consumes OrderPlacedEvent and asks ProductService to reserve stock.
using Messaging.Common.Consuming;
using Messaging.Common.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrchestratorService.Contracts.Messaging;
using RabbitMQ.Client;
namespace OrchestratorService.Infrastructure.Messaging.Consumers
{
// Purpose:
// This consumer listens to the RabbitMQ queue bound to the
// "order.placed" routing key.
//
// When OrderService publishes an OrderPlacedEvent (after saving
// a new order in its DB), this consumer receives it and delegates
// processing to the OrchestrationService.
//
// The OrchestrationService then starts the Saga workflow by
// publishing a StockReservationRequestedEvent to ProductService.
//
// Key Responsibilities:
// - Receive messages from RabbitMQ.
// - Deserialize JSON → OrderPlacedEvent (handled by BaseConsumer).
// - Create DI scope and resolve OrchestrationService.
// - Start the Saga orchestration (next step in workflow).
public sealed class OrderPlacedConsumer : BaseConsumer<OrderPlacedEvent>
{
// Used to create new dependency injection scopes for each message.
// Required because this consumer runs as a hosted background service,
// and cannot directly use scoped services (like DbContext or business services).
private readonly IServiceScopeFactory _scopeFactory;
// --------------------------------------------------------
// Constructor
// Parameters:
// - channel: The RabbitMQ IModel channel used for consuming messages.
// - queueName: The queue name this consumer should listen to.
// - scopeFactory: Allows creation of scoped service providers per message.
// - logger: Structured logging support for visibility & diagnostics.
//
// Notes:
// - BaseConsumer<T> handles all the low-level message consumption logic:
// → queue subscription
// → JSON deserialization
// → automatic ACK/NACK handling
// --------------------------------------------------------
public OrderPlacedConsumer(
IModel channel,
string queueName,
IServiceScopeFactory scopeFactory,
ILogger<OrderPlacedConsumer> logger)
: base(channel, queueName, logger)
{
_scopeFactory = scopeFactory;
}
// --------------------------------------------------------
// Method: HandleMessage
// Description:
// This method is called automatically by the BaseConsumer
// whenever a new OrderPlacedEvent message is received from
// RabbitMQ and successfully deserialized.
//
// Workflow:
// 1️. Create a new dependency injection scope.
// 2️. Resolve IOrchestrationService from the scoped provider.
// 3️. Call OnOrderPlacedAsync() to start the Saga orchestration flow.
//
// Message Flow Summary:
// → OrderService publishes "order.placed".
// → Orchestrator’s OrderPlacedConsumer receives it.
// → Calls OrchestrationService.OnOrderPlacedAsync().
// → Publishes "stock.reservation.requested" to ProductService.
//
// Error Handling:
// - If this method completes successfully → message is ACKed.
// - If an exception occurs → BaseConsumer will automatically NACK
// the message so it can be re-queued or moved to DLX (Dead Letter Queue).
// --------------------------------------------------------
protected override async Task HandleMessage(OrderPlacedEvent message)
{
Console.WriteLine($"OrchestratorService [Consumer] → OrderPlacedConsumer for OrderId={message.OrderId}");
// Create a DI scope for this message.
// Each message gets a fresh scope to safely resolve scoped services
// (e.g., DbContext, OrchestrationService) that should be disposed afterward.
using var scope = _scopeFactory.CreateScope();
// Resolve OrchestrationService from DI container.
// This service contains the actual Saga orchestration logic
// (publishing StockReservationRequestedEvent, caching order, etc.).
var orchestrator = scope.ServiceProvider.GetRequiredService<IOrchestrationService>();
// Pass the event to the orchestration logic.
// This begins the distributed workflow by asking ProductService to reserve stock.
await orchestrator.OnOrderPlacedAsync(message);
// If no exceptions are thrown, BaseConsumer automatically ACKs the message,
// marking it as successfully processed.
}
}
}
Needs / Uses:
- Subscribed to RabbitMQ queue bound to routing key order.placed.
- Acts as the Entry Point of the Saga workflow.
- Creates a DI Scope Per Message for safe use of Scoped services.
- Delegates processing to OrchestrationService to start orchestration.
- Begins the Saga by publishing StockReservationRequestedEvent
- Ensures messages are ACKed or NACKed automatically (handled by BaseConsumer).
Messaging/Consumers/StockReservedConsumer.cs
Listens to StockReservedCompletedEvent from ProductService, indicating successful stock reservation. It calls OnStockReservedAsync() to publish an OrderConfirmedEvent, completing the Saga’s success path. Create a class file named StockReservedConsumer.cs within the Messaging/Consumers folder, and copy-paste the following code.
using Messaging.Common.Consuming;
using Messaging.Common.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrchestratorService.Contracts.Messaging;
using RabbitMQ.Client;
namespace OrchestratorService.Infrastructure.Messaging.Consumers
{
// Purpose:
// This consumer listens to RabbitMQ messages published by ProductService
// after stock has been successfully reserved.
//
// In other words, it represents the "success path" in the Saga flow.
// Once ProductService publishes a StockReservedCompletedEvent,
// this consumer triggers the Orchestrator to finalize the order
// by publishing an OrderConfirmedEvent.
//
// Message Flow:
// [ProductService] → publishes StockReservedCompletedEvent
// ↓
// [OrchestratorService] → consumes it via StockReservedConsumer
// ↓
// Calls OrchestrationService.OnStockReservedAsync()
// ↓
// Publishes OrderConfirmedEvent → (NotificationService react)
public sealed class StockReservedConsumer : BaseConsumer<StockReservedCompletedEvent>
{
// IServiceScopeFactory is required because this consumer runs as a hosted background service.
// We cannot inject scoped dependencies (like DbContext or OrchestrationService) directly.
// Instead, we create a new dependency injection scope for each incoming message.
private readonly IServiceScopeFactory _scopeFactory;
// --------------------------------------------------------
// Constructor
// Parameters:
// - channel: RabbitMQ channel (IModel) used for consuming messages.
// - queueName: Name of the queue bound to the routing key "stock.reserved".
// - scopeFactory: Creates DI scopes for safely resolving scoped services per message.
// - logger: Used for structured logging (provided to BaseConsumer).
// --------------------------------------------------------
public StockReservedConsumer(
IModel channel,
string queueName,
IServiceScopeFactory scopeFactory,
ILogger<StockReservedConsumer> logger)
: base(channel, queueName, logger)
{
_scopeFactory = scopeFactory;
}
// --------------------------------------------------------
// Method: HandleMessage
// Description:
// Invoked automatically by BaseConsumer whenever a
// StockReservedCompletedEvent is received and deserialized.
//
// Responsibilities:
// 1️. Create a new scoped lifetime for this message (fresh DI scope).
// 2️. Resolve IOrchestrationService from the scoped provider.
// 3️. Delegate processing to OnStockReservedAsync(), which:
// → Retrieves cached order info (OrderPlacedEvent).
// → Publishes OrderConfirmedEvent.
// → Removes order from cache (Saga complete).
//
// Outcome:
// A successful stock reservation triggers an OrderConfirmedEvent,
// allowing downstream services (OrderService, NotificationService)
// to finalize the workflow.
//
// Error Handling:
// - If no exception: message is ACKed (successfully processed).
// - If exception: message is NACKed (goes to DLQ or requeue, based on config).
// --------------------------------------------------------
protected override async Task HandleMessage(StockReservedCompletedEvent message)
{
Console.WriteLine($"OrchestratorService [Consumer] → StockReservedConsumer for OrderId={message.OrderId}");
// Create a DI scope for this specific message.
// Ensures any scoped services (e.g., DbContext or OrchestrationService)
// are instantiated fresh and disposed after use.
using var scope = _scopeFactory.CreateScope();
// Resolve the OrchestrationService (application layer).
// This service contains the main Saga logic for handling success cases.
var orchestrator = scope.ServiceProvider.GetRequiredService<IOrchestrationService>();
// Delegate processing to the orchestrator logic.
// It will:
// - Retrieve the cached OrderPlacedEvent (from memory/Redis).
// - Publish an OrderConfirmedEvent.
// - Clean up the Saga state for this order.
await orchestrator.OnStockReservedAsync(message);
// If successful:
// - BaseConsumer automatically sends an ACK to RabbitMQ.
// - This marks the message as processed and removes it from the queue.
}
}
}
Needs / Uses:
- Subscribed to routing key stock.reserved.
- Represents the Success Path of the Saga.
- Invokes OnStockReservedAsync() in OrchestrationService to confirm the order and clean up cache.
- Creates a scoped DI context for per-message safety.
- Retrieves cached order and publishes OrderConfirmedEvent.
- Cleans up cache after Saga completion
- Ensures message acknowledgment and Saga completion.
Messaging/Consumers/StockReservationFailedConsumer.cs
StockReservationFailedConsumer handles the failure path in the Saga. When the ProductService cannot reserve stock, this consumer triggers the compensation process, cancelling the order and notifying downstream services. Create a class file named StockReservationFailedConsumer.cs within the Messaging/Consumers folder, and copy-paste the following code.
using Messaging.Common.Consuming;
using Messaging.Common.Events;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrchestratorService.Contracts.Messaging;
using RabbitMQ.Client;
namespace OrchestratorService.Infrastructure.Messaging.Consumers
{
// Purpose:
// This consumer listens for the "StockReservationFailedEvent" message
// published by ProductService when stock reservation cannot be completed.
//
// Example scenarios:
// - Product is out of stock
// - Insufficient quantity available
// - Invalid or missing product entries
//
// Once this consumer receives the event, it triggers the
// OrchestrationService to perform *compensating actions*:
// → Publish an OrderCancelledEvent
// → Notify OrderService to mark order as Cancelled
// → Notify NotificationService to inform the user
//
// This represents the failure path of the Saga workflow.
public sealed class StockReservationFailedConsumer : BaseConsumer<StockReservationFailedEvent>
{
// IServiceScopeFactory is required because this consumer
// runs as a background hosted service and cannot directly
// inject scoped services (like DbContext or OrchestrationService).
// Each message processed gets its own DI scope for safety.
private readonly IServiceScopeFactory _scopeFactory;
// --------------------------------------------------------
// Constructor
// Parameters:
// - channel: RabbitMQ channel (IModel) used to receive messages.
// - queueName: Queue name bound to routing key "stock.failed"
// (configured via RabbitMqOptions).
// - scopeFactory: Enables creation of a new DI scope per message.
// - logger: Used for structured logging and tracing.
// --------------------------------------------------------
public StockReservationFailedConsumer(
IModel channel,
string queueName,
IServiceScopeFactory scopeFactory,
ILogger<StockReservationFailedConsumer> logger)
: base(channel, queueName, logger)
{
_scopeFactory = scopeFactory;
}
// --------------------------------------------------------
// Method: HandleMessage
//
// Description:
// Called automatically by BaseConsumer when a new
// StockReservationFailedEvent is received and deserialized.
//
// This method handles the compensation part of the Saga flow.
//
// Workflow Steps:
// 1️. Create a new dependency injection scope.
// 2️. Resolve IOrchestrationService from DI container.
// 3️. Call OnStockReservationFailedAsync() to perform compensation.
//
// Responsibilities of OrchestrationService:
// → Retrieve cached OrderPlacedEvent (from memory or Redis).
// → Publish OrderCancelledEvent (with detailed failure reason).
// → Remove the order from cache (saga cleanup).
//
// Outcome:
// OrderService and NotificationService are informed of cancellation.
//
// Error Handling:
// - Success → message is ACKed (removed from queue).
// - Exception → BaseConsumer automatically NACKs
// the message (goes to DLX or requeue, depending on setup).
// --------------------------------------------------------
protected override async Task HandleMessage(StockReservationFailedEvent message)
{
Console.WriteLine($"OrchestratorService [Consumer] → StockReservationFailedConsumer for OrderId={message.OrderId}");
// Create a scoped lifetime for this specific message.
// This ensures that any scoped services (like DbContext or
// OrchestrationService) are fresh instances, and disposed
// properly after the message is processed.
using var scope = _scopeFactory.CreateScope();
// Resolve OrchestrationService from DI container.
// This application-layer service contains the business logic
// for handling failed stock reservations in the Saga flow.
var orchestrator = scope.ServiceProvider.GetRequiredService<IOrchestrationService>();
// Delegate failure handling to OrchestrationService.
// This triggers:
// - OrderCancelledEvent publication
// - Order status compensation in OrderService
// - Customer notification via NotificationService
await orchestrator.OnStockReservationFailedAsync(message);
// BaseConsumer will automatically ACK this message if
// processing completes successfully.
// If any exception occurs, BaseConsumer will NACK the message,
// allowing RabbitMQ to handle retries or DLQ routing.
}
}
}
Needs / Uses:
- Listens to stock.reservation_failed routing key.
- Represents the Failure/Compensation Path in the Saga
- Creates a scoped DI lifetime per message.
- Calls OnStockReservationFailedAsync() in OrchestrationService to publish OrderCancelledEvent.
- Cleans up cached order data after failure.
- Keeps the system consistent even in the face of distributed transaction failures.
Messaging/Extensions/RabbitMqConsumerExtensions
RabbitMqConsumerExtensions simplifies registering all consumers in the DI container. It makes the Program.cs file, clean and maintains consistent consumer setup logic in a single reusable place. Create a class file named RabbitMqConsumerExtensions.cs within the Messaging/Extensions folder of OrchestratorService.Infrastructure project and copy-paste the following code.
using Messaging.Common.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OrchestratorService.Infrastructure.Messaging.Consumers;
using RabbitMQ.Client;
namespace OrchestratorService.Infrastructure.Messaging.Extensions
{
// Extension methods for registering RabbitMQ consumers
// (background hosted services) for the OrchestratorService.
public static class RabbitMqConsumerExtensions
{
// Registers all RabbitMQ consumers required by the Orchestrator.
// Each consumer runs as a hosted background service and listens
// to a specific queue for incoming events in the Saga workflow.
public static IServiceCollection AddOrchestratorConsumers(this IServiceCollection services)
{
// -------------------------------
// ORDER PLACED CONSUMER
// -------------------------------
// - Listens to queue: orchestrator.order_placed
// - Triggered when OrderService publishes OrderPlacedEvent
// - Starts the Saga flow by asking ProductService to reserve stock
services.AddHostedService(sp =>
{
var channel = sp.GetRequiredService<IModel>();
var options = sp.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
var logger = sp.GetRequiredService<ILogger<OrderPlacedConsumer>>();
return new OrderPlacedConsumer(
channel,
options.QOrchestratorOrderPlaced,
scopeFactory,
logger);
});
// -------------------------------
// STOCK RESERVED CONSUMER
// -------------------------------
// - Listens to queue: orchestrator.stock_reserved
// - Triggered when ProductService successfully reserves stock
// - Publishes OrderConfirmedEvent (success path of the Saga)
services.AddHostedService(sp =>
{
var channel = sp.GetRequiredService<IModel>();
var options = sp.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
var logger = sp.GetRequiredService<ILogger<StockReservedConsumer>>();
return new StockReservedConsumer(
channel,
options.QOrchestratorStockReserved,
scopeFactory,
logger);
});
// -------------------------------
// STOCK RESERVATION FAILED CONSUMER
// -------------------------------
// - Listens to queue: orchestrator.stock_failed
// - Triggered when ProductService fails to reserve stock
// - Publishes OrderCancelledEvent (compensation path)
services.AddHostedService(sp =>
{
var channel = sp.GetRequiredService<IModel>();
var options = sp.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
var logger = sp.GetRequiredService<ILogger<StockReservationFailedConsumer>>();
return new StockReservationFailedConsumer(
channel,
options.QOrchestratorStockFailed,
scopeFactory,
logger);
});
return services;
}
}
}
Needs / Uses:
- Provides AddOrchestratorConsumers() extension method.
- Registers OrderPlacedConsumer, StockReservedConsumer, and StockReservationFailedConsumer as hosted services.
- Injects dependencies (channel, scope factory, options, logger) for each consumer.
- Centralizes consumer registration for maintainability.
OrchestratorService.API (ASP.NET Core Web API)
The API layer serves as the Entry Point and Startup Configuration for the Orchestrator microservice. It registers all dependencies, sets up RabbitMQ, initializes the topology, and starts background consumers to continuously listen for events.
Uses:
- Configures Dependency Injection (DI) for all layers.
- Sets up RabbitMQ Connections, Exchanges, and Queues at startup.
- Registers Publishers (Singleton) and Orchestration Service (Scoped).
- Starts Background Consumers to handle incoming events.
First, add project references to Messaging.Common, OrchestratorService.Contract, OrchestratorService.Application, and OrchestratorService.Infrastructure
Program.cs
Acts as the entry point and bootstrapper for the OrchestratorService. It configures DI, sets up RabbitMQ, initializes the topology, and starts all consumers as hosted background services. Please modify the Program class as follows.
using Messaging.Common.Extensions;
using Messaging.Common.Options;
using Messaging.Common.Publishing;
using Messaging.Common.Topology;
using Microsoft.Extensions.Options;
using OrchestratorService.Application.Services;
using OrchestratorService.Contracts.Messaging;
using OrchestratorService.Infrastructure.Messaging.Extensions;
using OrchestratorService.Infrastructure.Messaging.Producers;
using RabbitMQ.Client;
namespace OrchestratorService.API
{
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers()
.AddJsonOptions(options =>
{
options.JsonSerializerOptions.PropertyNamingPolicy = null;
});
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// In-memory cache
// Used by OrchestrationService to store OrderPlaced events temporarily.
// This is a simple local cache; can later be replaced by Redis for durability.
builder.Services.AddMemoryCache();
// RabbitMQ configuration (Strongly typed options)
builder.Services.Configure<RabbitMqOptions>(builder.Configuration.GetSection("RabbitMq"));
var mq = builder.Configuration.GetSection("RabbitMq").Get<RabbitMqOptions>()!;
// Register RabbitMQ connection + channel
// Singleton: connection and channel should remain open and reused
// throughout the service lifetime for efficiency.
builder.Services.AddRabbitMq(mq.HostName, mq.UserName, mq.Password, mq.VirtualHost);
// Publisher setup
// IPublisher → Base abstraction that handles:
// - Persistent RabbitMQ connection and channel
// - JSON serialization of messages
// Singleton Reason:
// - Uses a long-lived RabbitMQ channel (expensive to create per call)
// - Thread-safe for concurrent use
// - No per-request or per-message state
builder.Services.AddSingleton<IPublisher, Publisher>();
// IOrderEventsPublisher → Orchestrator-specific wrapper around IPublisher
// Adds routing logic and event-specific publishing methods
// Singleton Reason:
// - Stateless wrapper around IPublisher
// - Shares same underlying RabbitMQ channel
// - No scoped dependencies or state tracking
builder.Services.AddSingleton<IOrderEventsPublisher, OrderEventsPublisher>();
// IOrchestrationService → The Saga Orchestration "brain"
// Coordinates the workflow between Order, Product, and Notification services.
// Scoped Reason:
// - Runs once per message handling cycle (via consumers)
// - May depend on scoped services in future (like DbContext or transactional logic)
builder.Services.AddScoped<IOrchestrationService, OrchestrationService>();
// Register background consumers for RabbitMQ queues
// (Each consumer is a hosted background service)
// Each consumer:
// - Is registered as a Singleton HostedService
// - Runs continuously, listening to one queue
// - Uses IServiceScopeFactory internally to create
// scoped lifetimes per message
builder.Services.AddOrchestratorConsumers();
var app = builder.Build();
// 7Ensure RabbitMQ topology
// Creates all required exchanges, queues, and bindings at startup.
// This step is idempotent (safe to call even if everything exists).
using (var scope = app.Services.CreateScope())
{
var ch = scope.ServiceProvider.GetRequiredService<IModel>();
var opt = scope.ServiceProvider.GetRequiredService<IOptions<RabbitMqOptions>>().Value;
RabbitTopology.EnsureAll(ch, opt);
}
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
}
}
}
Appsettings.json file:
Defines configuration for logging, allowed hosts, and RabbitMQ connection details. It will enable the OrchestratorService to connect to RabbitMQ without hardcoding credentials dynamically. Please update the appsettings.json file as follows:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"RabbitMq": {
// Connection
"HostName": "localhost",
"Port": 5672,
"UserName": "ecommerce_user",
"Password": "Test@1234",
"VirtualHost": "ecommerce_vhost"
}
}
In summary, the Orchestrator Service acts as the Brain of the Saga Pattern, ensuring reliable coordination between independent microservices. Each class file contributes to a specific responsibility, defining contracts, executing orchestration logic, managing message flow, or configuring infrastructure. Together, they enable seamless event-driven communication, fault tolerance, and consistency across the distributed system.

