Introduction
Message brokers are the nervous system of distributed systems. But simply adding a broker to your architecture isn’t enough—you need patterns that ensure reliability, consistency, and scalability. This article explores five critical message broker patterns that every solution architect must understand: transactional outbox, CQRS, CQRS with event sourcing, saga pattern, and competing consumers.
These patterns solve real problems: how do you prevent message loss during failures? How do you maintain eventual consistency across services? How do you coordinate distributed transactions? The answers lie in understanding and applying these patterns correctly.
1. The Transactional Outbox Pattern
Problem It Solves
The dual-write problem: When you update a database and publish a message, either operation might fail independently. This creates inconsistency:
- Database committed, message never sent → order lost
- Message sent, database rollback → ghost message
The Pattern
Instead of two independent operations, treat the database insert and message publishing as a single atomic transaction:
- Write event to outbox table (same database transaction)
- Commit the transaction (atomically)
- Poll/tail the outbox and publish to broker
- Mark as published once broker acknowledges
Architecture Diagram
┌─────────────────────────────────────────────────────┐
│ Service A │
├─────────────────────────────────────────────────────┤
│ │
│ 1. BEGIN TRANSACTION │
│ ├─ UPDATE orders SET status='paid' │
│ └─ INSERT INTO outbox (event_id, payload) │
│ 2. COMMIT (atomic) │
│ │
│ 3. Outbox Poller (separate process) │
│ └─ SELECT * FROM outbox WHERE published=false │
│ └─ Publish to Message Broker │
│ └─ UPDATE outbox SET published=true │
│ │
└─────────────────────────────────────────────────────┘
↓
[Message Broker]
↓
┌─────────────────────────────────────────────────────┐
│ Service B (Consumer) │
└─────────────────────────────────────────────────────┘
Implementation Example
@Service
public class OrderService {
private final OrderRepository orderRepo;
private final OutboxRepository outboxRepo;
private final MessageBroker broker;
@Transactional
public void createOrder(Order order) {
// Both operations in same transaction
Order savedOrder = orderRepo.save(order);
OutboxEvent event = new OutboxEvent(
"OrderCreated",
serialize(savedOrder),
false
);
outboxRepo.save(event); // Same transaction!
}
}
// Separate scheduled poller
@Component
public class OutboxPoller {
@Scheduled(fixedDelay = 1000)
public void pollAndPublish() {
List<OutboxEvent> unpublished = outboxRepo.findByPublishedFalse();
for (OutboxEvent event : unpublished) {
try {
broker.publish(event.getTopic(), event.getPayload());
outboxRepo.markPublished(event.getId());
} catch (Exception e) {
// Retry on next poll
log.error("Failed to publish event", e);
}
}
}
}
When to Use
- Always need atomicity between state change and messaging
- Database is your source of truth
- Can tolerate slight message publishing delay (seconds)
Strengths & Weaknesses
| Aspect | Benefit/Drawback |
|---|---|
| Atomicity | ✅ Database writes and event publishing are atomic |
| Simplicity | ✅ Easy to understand and implement |
| Exactly-once delivery | ✅ Can achieve via idempotent consumers |
| Latency | ⚠️ Outbox polling adds delay (typically 1-5s) |
| Coupling | ✅ Decouples services via broker |
2. Command Query Responsibility Segregation (CQRS)
Problem It Solves
Real-world systems have asymmetric read/write patterns:
- Writes: Complex business logic, transactions, consistency requirements
- Reads: High volume, need for speed, denormalization acceptable
Forcing a single database model to serve both creates tension.
The Pattern
Separate read and write models:
- Write side (Command): Normalized, transactional, ACID
- Read side (Query): Denormalized, optimized for queries, eventually consistent
The write side publishes events to sync the read side asynchronously.
Architecture Diagram
┌────────────────────────────────────┐
│ Client │
├────────────────────────────────────┤
│ (1) Issue command (write) │
│ (2) Query read model (read) │
└────────────────────────────────────┘
↙ ↖
Write Query
Model Model
↓ ↑
┌────────────┐ ┌─────────┐
│ Write DB │ │ Read DB │
│ (ACID) │ │ (Latest │
│ - Orders │ │ Views) │
│ - Payments │ │ - Order │
└────────────┘ │ List │
↓ │ - Stats │
Publishes └─────────┘
Events ↑
↓ ↑
[Message Broker]────┘
Implementation Example
// Command side (Write model)
@Service
public class OrderCommandService {
private final OrderRepository orderRepo;
private final EventPublisher eventPublisher;
@Transactional
public void placeOrder(PlaceOrderCommand cmd) {
Order order = new Order(cmd.getCustomerId(), cmd.getItems());
orderRepo.save(order);
// Publish event for read side
eventPublisher.publish(new OrderPlacedEvent(order));
}
}
// Query side (Read model)
@Service
public class OrderQueryService {
private final OrderViewRepository viewRepo;
public List<OrderView> getCustomerOrders(String customerId) {
// Direct query, no joins, super fast
return viewRepo.findByCustomerId(customerId);
}
}
// Event handler updates read model
@Component
public class OrderEventHandler {
private final OrderViewRepository viewRepo;
@EventListener
public void onOrderPlaced(OrderPlacedEvent event) {
OrderView view = new OrderView(
event.getOrder().getId(),
event.getOrder().getStatus(),
event.getOrder().getTotal()
);
viewRepo.save(view);
}
}
When to Use
- High read/write ratio disparity (e.g., 100 reads per write)
- Complex queries need denormalization
- Read latency is critical
- Can tolerate eventual consistency on reads
Strengths & Weaknesses
| Aspect | Benefit/Drawback |
|---|---|
| Read Performance | ✅ Optimized queries, no joins, fast |
| Write Clarity | ✅ Commands express intent clearly |
| Scalability | ✅ Scale reads and writes independently |
| Complexity | ⚠️ Two models to maintain |
| Consistency | ⚠️ Read side eventually consistent |
| Debugging | ⚠️ Harder to trace state across models |
3. CQRS with Event Sourcing
Problem It Solves
Beyond CQRS: What if you need a complete audit trail of all changes? What if you want to rebuild state at any point in time? What if you need to replay events for debugging?
The Pattern
Combine CQRS with event sourcing:
- Write side stores events (immutable append-only log)
- State is derived from the event stream
- Read side projects the event stream into denormalized views
Architecture Diagram
┌─────────────────────────────────────┐
│ Client Issues Command │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Write Side (Event Store) │
├─────────────────────────────────────┤
│ Append-Only Event Log: │
│ ├─ OrderCreated(id=123, total=$100)│
│ ├─ PaymentProcessed(id=123) │
│ ├─ OrderShipped(id=123) │
│ └─ ... (immutable history) │
└─────────────────────────────────────┘
↓ (publishes all events)
[Message Broker]
↓
┌─────────────────────────────────────┐
│ Projections (Read Side) │
├─────────────────────────────────────┤
│ View 1: Active Orders │
│ View 2: Customer Order History │
│ View 3: Revenue Analytics │
│ View 4: ... any view you need │
└─────────────────────────────────────┘
Implementation Example
// Event Store (write model)
@Service
public class OrderEventStore {
private final EventStoreRepository eventStore;
@Transactional
public void publishEvent(DomainEvent event) {
// Append immutably to event store
eventStore.save(new StoredEvent(
event.getAggregateId(),
event.getEventType(),
serialize(event),
Instant.now()
));
}
public List<StoredEvent> getEventsForOrder(String orderId) {
return eventStore.findByAggregateId(orderId)
.sorted(Comparator.comparing(StoredEvent::getTimestamp))
.collect(Collectors.toList());
}
}
// Rebuild state from events
@Service
public class OrderAggregateService {
private final EventStoreRepository eventStore;
public Order reconstructOrder(String orderId) {
List<StoredEvent> events = eventStore.findByAggregateId(orderId);
Order order = new Order(); // Empty
for (StoredEvent event : events) {
switch(event.getEventType()) {
case "OrderCreated":
order = Order.from((OrderCreatedEvent) deserialize(event.getPayload()));
break;
case "PaymentProcessed":
order.markPaid();
break;
case "OrderShipped":
order.markShipped();
break;
}
}
return order;
}
}
// Projections (read models)
@Component
public class OrderProjection {
private final OrderViewRepository viewRepo;
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
OrderView view = new OrderView(event.getOrderId(), "Created");
viewRepo.save(view);
}
@EventListener
public void onPaymentProcessed(PaymentProcessedEvent event) {
OrderView view = viewRepo.findById(event.getOrderId()).orElseThrow();
view.setStatus("Paid");
viewRepo.save(view);
}
}
When to Use
- Need complete audit trail for compliance
- Time-travel / point-in-time reconstruction needed
- Complex domain with many state changes
- Replay events for debugging or corrections
- Multiple read models with different shapes
Strengths & Weaknesses
| Aspect | Benefit/Drawback |
|---|---|
| Auditability | ✅ Complete immutable history |
| Time Travel | ✅ Rebuild any version of state |
| Replay | ✅ Debug and fix issues via replay |
| Scalability | ✅ Read side scales independently |
| Complexity | ⚠️ Highest complexity of patterns |
| Storage | ⚠️ Event store grows continuously |
| Eventual Consistency | ⚠️ Projections lag behind writes |
4. Saga Pattern (Distributed Transactions)
Problem It Solves
Transactions across multiple services fail atomically in monoliths but are complex in distributed systems. How do you maintain consistency across a multi-step workflow that spans multiple services?
The Pattern
A saga is a sequence of local transactions:
- Orchestrator (centralized): Coordinates steps, issues commands
- Choreography (decentralized): Services listen to events, take action
Orchestration-Based Saga Diagram
┌──────────────────────────────────────┐
│ Saga Orchestrator │
│ (Order Fulfillment Saga) │
├──────────────────────────────────────┤
│ 1. CreateOrder → Order Service │
│ ├─ Wait for OrderCreated event │
│ └─ On success, next step │
│ 2. ReserveInventory → Inventory Svc │
│ ├─ Wait for InventoryReserved │
│ └─ On failure, compensate step 1 │
│ 3. ProcessPayment → Payment Service │
│ ├─ Wait for PaymentProcessed │
│ └─ On failure, compensate steps 1,2
│ 4. CreateShipment → Shipping Service │
│ │
│ Compensations (rollback): │
│ - CancelOrder │
│ - ReleaseInventory │
│ - RefundPayment │
└──────────────────────────────────────┘
Choreography-Based Saga Diagram
┌──────────────┐
│Order Service │
├──────────────┤
│ Receive order│
│ Publish │
│ OrderCreated │
└──────────────┘
↓ [Message Broker]
├─→ Inventory Service (listens)
├─→ Payment Service (listens)
└─→ Shipping Service (listens)
Inventory Service:
├─ Receives OrderCreated
├─ Reserves stock
└─ Publishes InventoryReserved
↓
Payment Service:
├─ Receives InventoryReserved
├─ Processes payment
└─ Publishes PaymentProcessed
↓
Shipping Service:
├─ Receives PaymentProcessed
├─ Creates shipment
└─ Publishes ShipmentCreated
Implementation: Orchestrator Pattern
@Service
public class OrderFulfillmentSaga {
private final OrderService orderService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ShippingService shippingService;
@Transactional
public void executeOrderSaga(OrderRequest request) {
try {
// Step 1: Create Order
Order order = orderService.createOrder(request);
// Step 2: Reserve Inventory
inventoryService.reserveItems(order.getItems());
// Step 3: Process Payment
paymentService.charge(order.getCustomerId(), order.getTotal());
// Step 4: Create Shipment
shippingService.createShipment(order.getId());
// All steps succeeded
orderService.markCompleted(order.getId());
} catch (InventoryUnavailableException e) {
// Compensating transaction
orderService.cancelOrder(request.getOrderId());
} catch (PaymentFailedException e) {
// Compensating transactions
inventoryService.releaseItems(request.getOrderId());
orderService.cancelOrder(request.getOrderId());
}
}
}
Implementation: Choreography Pattern
// Order Service publishes event
@Service
public class OrderService {
@Transactional
public void createOrder(OrderRequest request) {
Order order = orderRepo.save(new Order(request));
eventBus.publish(new OrderCreatedEvent(order));
}
}
// Inventory Service listens and reacts
@Component
public class InventoryEventHandler {
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
try {
inventoryService.reserveItems(event.getOrder().getItems());
eventBus.publish(new InventoryReservedEvent(event.getOrder().getId()));
} catch (InsufficientInventoryException e) {
eventBus.publish(new InventoryReservationFailedEvent(event.getOrder().getId()));
}
}
}
// Payment Service listens for InventoryReserved
@Component
public class PaymentEventHandler {
@EventListener
public void onInventoryReserved(InventoryReservedEvent event) {
try {
Order order = orderService.getOrder(event.getOrderId());
paymentService.charge(order.getTotal());
eventBus.publish(new PaymentProcessedEvent(event.getOrderId()));
} catch (PaymentFailedException e) {
// Trigger compensations
eventBus.publish(new PaymentFailedEvent(event.getOrderId()));
}
}
}
When to Use
- Multi-step workflows spanning services
- Need to maintain consistency without distributed locks
- Long-running transactions (minutes, hours, days)
- Can tolerate eventual consistency
Strengths & Weaknesses
| Aspect | Benefit/Drawback |
|---|---|
| Coordination | ✅ Handles multi-service transactions |
| Resilience | ✅ Compensations enable rollback |
| Loose coupling | ✅ Services don’t call each other |
| Complexity | ⚠️ Compensating transactions add logic |
| Testing | ⚠️ Hard to test failure scenarios |
| Observability | ⚠️ State scattered across services |
5. Competing Consumers Pattern
Problem It Solves
How do you scale message processing when a single consumer can’t keep up? How do you ensure only one instance processes a message?
The Pattern
Multiple consumer instances listen to the same queue/topic, each processing messages concurrently. The broker ensures each message goes to exactly one consumer.
Architecture Diagram
┌────────────────────────┐
│ Message Broker │
│ Queue: orders │
│ ├─ Message 1 │
│ ├─ Message 2 │
│ ├─ Message 3 │
│ ├─ Message 4 │
│ └─ ... │
└────────────────────────┘
↙ ↓ ↖
┌──────┐ │ ┌──────┐
│Cons-1│ │ │Cons-2│
│Process│ │ │Process│
│Msg 1 │ │ │Msg 2 │
└──────┘ │ └──────┘
│
┌──────┐
│Cons-3│
│Process│
│Msg 3 │
└──────┘
Scale horizontally by adding more consumers.
Each message processed exactly once.
Implementation with Kafka
@Service
public class OrderConsumer {
private final OrderProcessor processor;
@KafkaListener(topics = "orders", groupId = "order-processing")
public void consumeOrder(String message) {
try {
OrderEvent event = deserialize(message);
processor.processOrder(event);
// Kafka auto-commits offset if no exception
} catch (Exception e) {
log.error("Failed to process order", e);
// Message returned to queue for retry
}
}
}
// Run multiple instances
// Instance 1: ConsumerGroupId=order-processing
// Instance 2: ConsumerGroupId=order-processing
// Instance 3: ConsumerGroupId=order-processing
// Each processes different messages in parallel
Implementation with RabbitMQ
@Service
public class OrderConsumer {
private final OrderProcessor processor;
@RabbitListener(queues = "orders", concurrency = "5")
public void consumeOrder(OrderEvent event) {
try {
processor.processOrder(event);
} catch (Exception e) {
log.error("Failed to process order", e);
// Nack message; return to queue
throw e;
}
}
}
// RabbitMQ distributes messages among competing consumers
// Concurrency = 5 threads per consumer instance
Competing Consumers with Priority
@Configuration
public class CompetingConsumersConfig {
// Standard priority queue
@Bean
public Queue standardQueue() {
return QueueBuilder.durable("orders-standard")
.build();
}
// High-priority queue (faster consumers)
@Bean
public Queue priorityQueue() {
return QueueBuilder.durable("orders-urgent")
.maxPriority(10)
.build();
}
}
@Service
public class PriorityOrderConsumer {
@RabbitListener(queues = "orders-urgent", concurrency = "10")
public void consumeUrgent(OrderEvent event) {
processor.processUrgent(event);
}
@RabbitListener(queues = "orders-standard", concurrency = "5")
public void consumeStandard(OrderEvent event) {
processor.processStandard(event);
}
}
When to Use
- High-volume message processing
- Need horizontal scaling of consumers
- Can distribute load across multiple instances
- Graceful degradation acceptable
Strengths & Weaknesses
| Aspect | Benefit/Drawback |
|---|---|
| Scalability | ✅ Linear scale with instances |
| Throughput | ✅ High message processing rate |
| Simplicity | ✅ Broker handles distribution |
| Ordering | ⚠️ No guaranteed order (by default) |
| Exactly-once | ⚠️ At-least-once delivery (idempotence needed) |
Pattern Selection Matrix
| Scenario | Best Pattern(s) |
|---|---|
| Ensure events published with DB writes | Transactional Outbox |
| Heavy reads, few writes | CQRS |
| Need audit trail + compliance | Event Sourcing |
| Multi-service workflow | Saga (Orchestrator or Choreography) |
| Scale message processing | Competing Consumers |
| Complex domain + high volume | CQRS + Event Sourcing + Saga |
| Financial transactions | Saga (Orchestrator) + Transactional Outbox |
Combining Patterns: A Real-World Example
E-commerce Order System:
Order Creation:
├─ Transactional Outbox
│ └─ Order written + OrderCreated event to outbox
│
├─ Saga (Orchestrator)
│ ├─ Reserve Inventory (compensation: release)
│ ├─ Process Payment (compensation: refund)
│ └─ Create Shipment
│
├─ CQRS
│ ├─ Write: Order Service (transactional)
│ └─ Read: Order View, Customer Orders (cached)
│
└─ Event Sourcing (optional)
└─ All state changes immutable in event store
Message Processing:
└─ Competing Consumers
├─ Payment service instances (3 replicas)
├─ Inventory service instances (5 replicas)
└─ Shipping service instances (2 replicas)
Conclusion
Message broker patterns are not optional complexity—they’re architectural necessities in distributed systems.
Key takeaways:
- Transactional Outbox solves the dual-write problem
- CQRS optimizes read/write asymmetry
- Event Sourcing enables auditability and time-travel
- Saga Pattern coordinates distributed transactions
- Competing Consumers scales message processing
Combine these patterns thoughtfully based on your domain’s needs, not dogmatically. The best architecture uses the right patterns for the right problems.
Further Reading: