Event Sourcing is an architectural pattern where instead of storing the current state of an entity, you store the sequence of events that led to that state. The current state is always derived by replaying the events. Combined with CQRS it gives you a complete audit trail, time-travel debugging, and the ability to create new read models from historical data. I show the core concepts with PHP implementation and connect them to CQRS.
State storage vs Event Sourcing
Traditional storage:
orders table:
| id | status | total | updated_at |
| 42 | processing | 149.99 | 2024-08-15 14:22:00 |
"What was the status yesterday?" -> Lost. "Who changed it?" -> Lost.
Event Sourcing storage:
order_events table:
| id | aggregate_id | event_type | payload | occurred_at |
| 1 | 42 | OrderPlaced | {total: 149.99, items: []} | 2024-08-15 10:00:00 |
| 2 | 42 | PaymentReceived | {method: card, ref: abc} | 2024-08-15 10:01:00 |
| 3 | 42 | OrderShipped | {tracking: PKG-001} | 2024-08-15 14:22:00 |
"What was the status yesterday at noon?" -> Replay events up to that point.
"Who changed it?" -> Event payload contains who did what and when.
Domain Events
<?php
declare(strict_types=1);
// Base event interface
interface DomainEventInterface
{
public function getAggregateId(): int;
public function getOccurredAt(): \DateTimeImmutable;
public function getEventType(): string;
public function toPayload(): array;
public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static;
}
// Concrete events - each is an immutable record of something that happened
final readonly class OrderPlaced implements DomainEventInterface
{
public function __construct(
private int $orderId,
private float $total,
private int $customerId,
private array $items,
private \DateTimeImmutable $occurredAt,
) {}
public function getAggregateId(): int { return $this->orderId; }
public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
public function getEventType(): string { return 'OrderPlaced'; }
public function toPayload(): array
{
return [
'total' => $this->total,
'customer_id' => $this->customerId,
'items' => $this->items,
];
}
public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static
{
return new static(
orderId: $aggregateId,
total: $payload['total'],
customerId: $payload['customer_id'],
items: $payload['items'],
occurredAt: $occurredAt,
);
}
}
final readonly class PaymentReceived implements DomainEventInterface
{
public function __construct(
private int $orderId,
private string $paymentMethod,
private string $transactionRef,
private \DateTimeImmutable $occurredAt,
) {}
public function getAggregateId(): int { return $this->orderId; }
public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
public function getEventType(): string { return 'PaymentReceived'; }
public function toPayload(): array
{
return ['method' => $this->paymentMethod, 'ref' => $this->transactionRef];
}
public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static
{
return new static($aggregateId, $payload['method'], $payload['ref'], $occurredAt);
}
}
Aggregate Root – applies events
<?php
declare(strict_types=1);
abstract class AggregateRoot
{
private array $uncommittedEvents = [];
protected function recordEvent(DomainEventInterface $event): void
{
$this->apply($event);
$this->uncommittedEvents[] = $event;
}
// Apply events in sequence to rebuild state
abstract protected function apply(DomainEventInterface $event): void;
public function getUncommittedEvents(): array
{
$events = $this->uncommittedEvents;
$this->uncommittedEvents = [];
return $events;
}
// Reconstitute from event history
public static function reconstitute(array $events): static
{
$instance = new static();
foreach ($events as $event) {
$instance->apply($event);
}
return $instance;
}
}
class Order extends AggregateRoot
{
private int $id;
private string $status = 'new';
private float $total = 0.0;
private array $items = [];
private ?string $trackingNumber = null;
public static function place(int $id, int $customerId, array $items, float $total): static
{
$order = new static();
$order->recordEvent(new OrderPlaced($id, $total, $customerId, $items, new \DateTimeImmutable()));
return $order;
}
public function receivePayment(string $method, string $ref): void
{
if ($this->status !== 'pending_payment') {
throw new \RuntimeException("Cannot receive payment in status: {$this->status}");
}
$this->recordEvent(new PaymentReceived($this->id, $method, $ref, new \DateTimeImmutable()));
}
protected function apply(DomainEventInterface $event): void
{
match($event->getEventType()) {
'OrderPlaced' => $this->applyOrderPlaced($event),
'PaymentReceived' => $this->applyPaymentReceived($event),
default => null,
};
}
private function applyOrderPlaced(OrderPlaced $event): void
{
$this->status = 'pending_payment';
$this->total = $event->toPayload()['total'];
$this->items = $event->toPayload()['items'];
}
private function applyPaymentReceived(PaymentReceived $event): void
{
$this->status = 'processing';
}
public function getStatus(): string { return $this->status; }
public function getTotal(): float { return $this->total; }
}
Event Store
<?php
declare(strict_types=1);
class EventStore
{
private array $eventRegistry;
public function __construct(
private \Magento\Framework\App\ResourceConnection $resourceConnection,
array $eventClasses // ['OrderPlaced' => OrderPlaced::class, ...]
) {
$this->eventRegistry = $eventClasses;
}
public function append(AggregateRoot $aggregate): void
{
$events = $aggregate->getUncommittedEvents();
$connection = $this->resourceConnection->getConnection();
$table = $this->resourceConnection->getTableName('vendor_event_store');
foreach ($events as $event) {
$connection->insert($table, [
'aggregate_id' => $event->getAggregateId(),
'event_type' => $event->getEventType(),
'payload' => json_encode($event->toPayload(), JSON_THROW_ON_ERROR),
'occurred_at' => $event->getOccurredAt()->format('Y-m-d H:i:s.u'),
]);
}
}
public function load(int $aggregateId): array
{
$connection = $this->resourceConnection->getConnection();
$rows = $connection->fetchAll(
$connection->select()
->from($this->resourceConnection->getTableName('vendor_event_store'))
->where('aggregate_id = ?', $aggregateId)
->order('id ASC')
);
return array_map(function($row) {
$class = $this->eventRegistry[$row['event_type']]
?? throw new \RuntimeException("Unknown event type: {$row['event_type']}");
return $class::fromPayload(
(int) $row['aggregate_id'],
json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR),
new \DateTimeImmutable($row['occurred_at'])
);
}, $rows);
}
}
// Usage
$eventStore = new EventStore($resourceConnection, [
'OrderPlaced' => OrderPlaced::class,
'PaymentReceived' => PaymentReceived::class,
]);
// Place an order
$order = Order::place(42, 123, [['sku' => 'A', 'qty' => 2]], 149.99);
$eventStore->append($order);
// Later: load and reconstitute from events
$events = $eventStore->load(42);
$order = Order::reconstitute($events);
echo $order->getStatus(); // 'pending_payment'
$order->receivePayment('card', 'TXN-001');
$eventStore->append($order);
$events = $eventStore->load(42);
$order = Order::reconstitute($events);
echo $order->getStatus(); // 'processing'
Summary
Event Sourcing trades storage simplicity for a complete, immutable history. Every state change is captured as an event with who, what, and when. The Aggregate Root pattern keeps business rules in domain objects; the Event Store persists everything; reconstitution replays events to rebuild current state. Combined with CQRS, the read side can build optimised projections from the event stream, completely independent of the write side. The complexity is real – justify it with audit requirements, time-travel debugging needs, or complex domain logic.
