freyr / message-broker
Reliable Inbox & Outbox Patterns for Symfony Messenger with transactional guarantees and automatic deduplication
Installs: 17
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 1
Forks: 0
Open Issues: 0
Type:symfony-bundle
pkg:composer/freyr/message-broker
Requires
- php: >=8.4
- ext-amqp: *
- ext-curl: *
- doctrine/dbal: ^3.0|^4.0
- doctrine/orm: ^3.0
- freyr/identity: ^0.2 | ^0.3
- nesbot/carbon: ^2.0|^3.0
- php-amqplib/php-amqplib: ^3.7
- symfony/amqp-messenger: ^6.4|^7.0
- symfony/config: ^6.4|^7.0
- symfony/console: ^6.4|^7.0
- symfony/dependency-injection: ^6.4|^7.0
- symfony/doctrine-messenger: ^6.4|^7.0
- symfony/http-kernel: ^6.4|^7.0
- symfony/messenger: ^6.4|^7.0
- symfony/property-access: ^6.4|^7.0
- symfony/serializer: ^6.4|^7.0
Requires (Dev)
- doctrine/doctrine-bundle: ^2.11
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^10.0|^11.0
- symfony/framework-bundle: ^6.4|^7.0
- symfony/phpunit-bridge: ^6.4|^7.0
- symfony/yaml: ^6.4|^7.0
- symplify/easy-coding-standard: ^12.6
README
** Inbox & Outbox Patterns for Symfony Messenger**
A Symfony bundle providing reliable event publishing and consumption with transactional guarantees, automatic deduplication, and seamless AMQP integration.
Features
- ✅ Transactional Outbox - Publish events reliably within your business transactions
- ✅ Automatic Deduplication at the Inbox - Binary UUID v7 primary key prevents duplicate processing
- ✅ Typed Message Handlers - Type-safe event consumption with IDE autocomplete
- ✅ Automatic DLQ Routing - Unmatched events routed to dead-letter queue
- ✅ Horizontal Scaling - Multiple workers with database-level SKIP LOCKED
Restrictions
- Zero Configuration - (in progress) Symfony Flex recipe automates installation
- AMQP support only - There is no plan do add Kafka/SQS etc.
Quick Start
Installation
composer require freyr/message-broker
** Flex is not registered yet ** That's it! Symfony Flex automatically:
- ✅ Registers the bundle
- ✅ Creates configuration files
- ✅ Adds database migrations
- ✅ Sets up environment variables
Setup Database
php bin/console doctrine:migrations:migrate
How it works
-
You emit your event (e.g.,
inventory.stock.received) from Application A (Inventory management) -
Messenger routing directs it to the outbox transport (Doctrine/database), inserting it in the same transaction as your business logic
-
php bin/console messenger:consume outbox -vvfetches events from outbox and publishes them to AMQP- Default routing: exchange =
inventory.stock, routing key =inventory.stock.received - Transactional outbox provides at-least-once delivery (events may be sent multiple times)
- Deduplication must happen at the receiving end
- Default routing: exchange =
-
Application B (Procurement) sets up AMQP infrastructure:
- Queue:
inventory_stock - Binds to exchange:
inventory.stock - Binding key:
inventory.stock.*
- Queue:
-
php bin/console messenger:consume --queue=inventory_stockfetches events from AMQP and saves to inbox database- Consume the messages with deduplication middleware providing at-most-once delivery (events may be sent multiple times)
Start Workers
# Process outbox (publish events to AMQP) php bin/console messenger:consume outbox -vv # Consumes messages from AMQP php bin/console messenger:consume --queue=inventory_stock
Usage
Publishing Events via Outbox
Step 1: Implement OutboxEventInterface
All events published through the outbox pattern must implement OutboxEventInterface:
use Freyr\MessageBroker\Outbox\MessageName; use Freyr\MessageBroker\Outbox\EventBridge\OutboxMessage; #[MessageName('order.placed')] final class OrderPlaced implements OutboxMessage { public function __construct( public string $orderId, public float $amount, ) { } }
Requirements:
- Must have #[MessageName('semantic.name')] attribute
- Must implement OutboxMessage marker interface
Domain Layer Alternative
To avoid coupling your domain to infrastructure, extend the interface:
// Your domain layer namespace App\Shared\Integration; use Freyr\MessageBroker\Outbox\OutboxEventInterface; interface IntegrationEvent extends OutboxEventInterface { // Your domain-specific contracts } // Your events #[MessageName('order.placed')] final class OrderPlaced implements IntegrationEvent { // ... }
This keeps your events referencing your own interface, not the infrastructure one.
AMQP Routing:
By default, events are published using convention-based routing:
- Exchange: First 2 parts of message name (
order.placed→order.placed) - Routing Key: Full message name (
order.placed)
You can override this with attributes:
use Freyr\MessageBroker\Outbox\Routing\MessengerTransport; use Freyr\MessageBroker\Outbox\Routing\AmqpRoutingKey; #[MessageName('order.placed')] #[MessengerTransport('commerce')] // Custom exchange #[AmqpRoutingKey('commerce.order.placed')] // Custom routing key final readonly class OrderPlaced { // ... }
See AMQP Routing for complete documentation.
2. Configure Routing
Edit config/packages/messenger.yaml:
framework: messenger: routing: 'App\Domain\Event\OrderPlaced': outbox
3. Dispatch the Event
use Symfony\Component\Messenger\MessageBusInterface; use Freyr\Identity\Id; class OrderService { public function __construct( private MessageBusInterface $messageBus, ) {} public function placeOrder(Order $order): void { // Save order (transaction started) $this->entityManager->persist($order); // Dispatch event (saved to outbox in same transaction) // Note: messageId is auto-generated by OutboxToAmqpBridge $this->messageBus->dispatch(new OrderPlaced( orderId: $order->getId(), customerId: $order->getCustomerId(), totalAmount: $order->getTotalAmount(), placedAt: CarbonImmutable::now() )); // Commit (order + event saved atomically) $this->entityManager->flush(); } }
The event is now stored in the outbox. Workers will publish it to AMQP asynchronously.
Consuming Events (Inbox Pattern)
1. Define Consumer Message
Create a message class matching the event structure:
<?php namespace App\Message; use Freyr\Identity\Id; use Carbon\CarbonImmutable; final readonly class OrderPlaced { public function __construct( public Id $orderId, public Id $customerId, public float $totalAmount, public CarbonImmutable $placedAt, ) {} }
Important: Consumer messages contain only business data (no messageId - it's transport metadata).
2. Configure Message Type Mapping
Edit config/packages/message_broker.yaml:
message_broker: inbox: message_types: 'order.placed': 'App\Message\OrderPlaced'
3. Create Handler
use Symfony\Component\Messenger\Attribute\AsMessageHandler; #[AsMessageHandler] final readonly class OrderPlacedHandler { public function __invoke(OrderPlaced $message): void { // Type-safe with IDE autocomplete! $orderId = $message->orderId; $amount = $message->totalAmount; // Process the event... } }
4. Consume from AMQP
Prerequisites: RabbitMQ queue must already exist with proper bindings.
# Ingest from AMQP → Inbox transport php bin/console inbox:ingest --queue=your.queue # Process inbox messages → Handlers php bin/console messenger:consume inbox -vv
Message Format
AMQP messages use this structure:
Headers:
type: order.placed- Semantic message nameX-Message-Stamp-MessageIdStamp: [{"messageId":"..."}]- For deduplication
Body (messageId stripped):
{
"orderId": "550e8400-e29b-41d4-a716-446655440000",
"customerId": "01234567-89ab-cdef-0123-456789abcdef",
"totalAmount": 99.99,
"placedAt": "2024-01-01T12:00:00+00:00"
}
Note: messageId is transport metadata (for deduplication), not business data. It's sent via MessageIdStamp header, not in the payload.
Configuration
After Installation
Symfony Flex creates these configuration files:
config/packages/message_broker.yaml:
message_broker: inbox: table_name: messenger_inbox message_types: {} # Add your mappings here failed_transport: failed outbox: table_name: messenger_outbox dlq_transport: dlq
config/packages/messenger.yaml:
framework: messenger: transports: outbox: dsn: 'outbox://default?table_name=messenger_outbox&queue_name=outbox' inbox: dsn: 'inbox://default?table_name=messenger_inbox&queue_name=inbox' amqp: dsn: '%env(MESSENGER_AMQP_DSN)%' dlq: dsn: 'doctrine://default?queue_name=dlq' failed: dsn: 'doctrine://default?queue_name=failed' routing: # Add your domain events here: # 'App\Domain\Event\OrderPlaced': outbox # Add your typed consumer messages here: # 'App\Message\OrderPlaced': inbox
.env:
MESSENGER_AMQP_DSN=amqp://guest:guest@localhost:5672/%2f
Customization
You can customize:
- Table names in
message_broker.yaml - Transport DSNs in
messenger.yaml - AMQP connection in
.env - Failed/DLQ transport names
Production Deployment
Docker Compose
services: # Consume from AMQP and save to inbox database worker-inbox-ingest: image: your-app:latest command: php bin/console inbox:ingest --queue=your.queue restart: always deploy: replicas: 2 # Process outbox database and publish to AMQP worker-outbox: image: your-app:latest command: php bin/console messenger:consume outbox --time-limit=3600 restart: always deploy: replicas: 2 # Process inbox database and dispatch to handlers worker-inbox: image: your-app:latest command: php bin/console messenger:consume inbox --time-limit=3600 restart: always deploy: replicas: 3
Monitoring
# View queue statistics php bin/console messenger:stats # View failed messages php bin/console messenger:failed:show # Retry failed messages php bin/console messenger:failed:retry
Documentation
Core Principles:
- Outbox Pattern - Transactional Consistency - How events are published reliably within database transactions
- Inbox Deduplication - How duplicate messages are prevented at consumption
- Message Serialization - Semantic naming and cross-language compatibility
- AMQP Routing - Convention-based routing with attribute overrides
Manual Installation (Without Symfony Flex)
If Symfony Flex is not available in your project, follow these manual installation steps:
1. Install via Composer
composer require freyr/message-broker --no-scripts
2. Register the Bundle
Edit config/bundles.php:
return [ // ... other bundles Freyr\MessageBroker\FreyrMessageBrokerBundle::class => ['all' => true], ];
3. Create Configuration Files
config/packages/message_broker.yaml:
message_broker: inbox: # Message type mapping: message_name => PHP class # Used by InboxSerializer to translate semantic names to FQN during deserialization message_types: # Examples: # 'order.placed': 'App\Message\OrderPlaced' # 'user.registered': 'App\Message\UserRegistered'
config/packages/messenger.yaml:
framework: messenger: # Failure transport for handling failed messages failure_transport: failed # Middleware configuration # DeduplicationMiddleware runs AFTER doctrine_transaction (priority -10) # This ensures deduplication INSERT is within the transaction default_middleware: enabled: true allow_no_handlers: false buses: messenger.bus.default: middleware: - doctrine_transaction # Priority 0 (starts transaction) # DeduplicationMiddleware (priority -10) registered via service tag # Runs after transaction starts, before handlers transports: outbox: dsn: 'doctrine://default?table_name=messenger_outbox&queue_name=outbox' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # AMQP transport - external message broker # For publishing from outbox: uses OutboxSerializer # For consuming to inbox: uses InboxSerializer amqp: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\OutboxSerializer' options: auto_setup: false retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # AMQP consumption transport (example) - uses InboxSerializer amqp_orders: dsn: '%env(MESSENGER_AMQP_DSN)%' serializer: 'Freyr\MessageBroker\Serializer\InboxSerializer' options: auto_setup: false queue: name: 'orders_queue' retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 # Failed transport - for all failed messages failed: dsn: 'doctrine://default?queue_name=failed' options: auto_setup: false routing: # Outbox messages - route domain events to outbox transport # Example: # 'App\Domain\Event\OrderPlaced': outbox # 'App\Domain\Event\UserRegistered': outbox # Inbox messages (consumed from AMQP transports) # Messages are deserialized by InboxSerializer into typed objects # DeduplicationMiddleware automatically prevents duplicate processing # Handlers execute synchronously (no routing needed - AMQP transport handles delivery) # Example handlers: # #[AsMessageHandler] # class OrderPlacedHandler { public function __invoke(OrderPlaced $message) {} }
config/packages/doctrine.yaml:
doctrine: dbal: types: id_binary: Freyr\MessageBroker\Doctrine\Type\IdType
config/services.yaml:
services: # Doctrine Integration Freyr\MessageBroker\Doctrine\Type\IdType: tags: - { name: 'doctrine.dbal.types', type: 'id_binary' } # Auto-register all Normalizers using Symfony's native tag # These will be automatically added to the @serializer service Freyr\MessageBroker\Serializer\Normalizer\: resource: '../vendor/freyr/message-broker/src/Serializer/Normalizer/' tags: ['serializer.normalizer'] # Custom ObjectNormalizer with property promotion support # This overrides Symfony's default ObjectNormalizer with propertyTypeExtractor # Lower priority (-1000) ensures it runs as fallback after specialized normalizers Freyr\MessageBroker\Serializer\Normalizer\PropertyPromotionObjectNormalizer: autowire: true class: Symfony\Component\Serializer\Normalizer\ObjectNormalizer arguments: $propertyTypeExtractor: '@property_info' tags: - { name: 'serializer.normalizer', priority: -1000 } # Inbox Serializer - for AMQP consumption # Injects native @serializer service with all registered normalizers Freyr\MessageBroker\Serializer\InboxSerializer: arguments: $messageTypes: '%message_broker.inbox.message_types%' $serializer: '@serializer' # Outbox Serializer - for AMQP publishing # Injects native @serializer service with all registered normalizers Freyr\MessageBroker\Serializer\OutboxSerializer: arguments: $serializer: '@serializer' # Deduplication Store (DBAL implementation) Freyr\MessageBroker\Inbox\DeduplicationStore: class: Freyr\MessageBroker\Inbox\DeduplicationDbalStore arguments: $connection: '@doctrine.dbal.default_connection' $logger: '@logger' # Deduplication Middleware (inbox pattern) # Runs AFTER doctrine_transaction middleware (priority -10) Freyr\MessageBroker\Inbox\DeduplicationMiddleware: arguments: $store: '@Freyr\MessageBroker\Inbox\DeduplicationStore' tags: - { name: 'messenger.middleware', priority: -10 } # AMQP Routing Strategy (default convention-based routing) Freyr\MessageBroker\Outbox\Routing\AmqpRoutingStrategyInterface: class: Freyr\MessageBroker\Outbox\Routing\DefaultAmqpRoutingStrategy # Outbox Bridge (publishes outbox events to AMQP) Freyr\MessageBroker\Outbox\EventBridge\OutboxToAmqpBridge: autoconfigure: true arguments: $eventBus: '@messenger.default_bus' $routingStrategy: '@Freyr\MessageBroker\Outbox\Routing\AmqpRoutingStrategyInterface' $logger: '@logger' # Deduplication Store Cleanup Command (optional maintenance) Freyr\MessageBroker\Command\DeduplicationStoreCleanup: arguments: $connection: '@doctrine.dbal.default_connection' tags: ['console.command']
5. Create Database Migration
Create migrations/VersionYYYYMMDDHHIISS.php:
<?php declare(strict_types=1); namespace DoctrineMigrations; use Doctrine\DBAL\Schema\Schema; use Doctrine\Migrations\AbstractMigration; /** * Auto-generated Migration: Message Broker Deduplication table * * Creates deduplication tracking table with binary UUID v7 for middleware-based deduplication. */ final class Version20250103000001 extends AbstractMigration { public function getDescription(): string { return 'Create message_broker_deduplication table for middleware-based deduplication'; } public function up(Schema $schema): void { // Create message_broker_deduplication table with binary UUID v7 $this->addSql(" CREATE TABLE message_broker_deduplication ( message_id BINARY(16) NOT NULL PRIMARY KEY COMMENT '(DC2Type:id_binary)', message_name VARCHAR(255) NOT NULL, processed_at DATETIME NOT NULL, INDEX idx_message_name (message_name), INDEX idx_processed_at (processed_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci "); } public function down(Schema $schema): void { $this->addSql('DROP TABLE message_broker_deduplication'); } }
6. Add Environment Variables
Edit .env:
###> freyr/message-broker ### MESSENGER_AMQP_DSN=amqp://guest:guest@localhost:5672/%2f ###< freyr/message-broker ###
7. Run Migrations
php bin/console doctrine:migrations:migrate
8. Verify Installation
# Check bundle is registered php bin/console debug:container | grep MessageBroker # Check transports are available php bin/console debug:messenger # Start workers php bin/console inbox:ingest --queue=your.queue # AMQP → inbox database php bin/console messenger:consume outbox -vv # Outbox database → AMQP