PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

Event Sourcing – Domain Events, Aggregate Root, Event Store, connecting with CQRS

by Henryk Tews / Tuesday, 06 August 2024 / Published in Wzorce projektowe

Event Sourcing is an architectural pattern where instead of storing the current state of an entity, you store the sequence of events that led to that state. The current state is always derived by replaying the events. Combined with CQRS it gives you a complete audit trail, time-travel debugging, and the ability to create new read models from historical data. I show the core concepts with PHP implementation and connect them to CQRS.

State storage vs Event Sourcing

Traditional storage:
  orders table:
    | id | status      | total  | updated_at          |
    | 42 | processing  | 149.99 | 2024-08-15 14:22:00 |

  "What was the status yesterday?" -> Lost. "Who changed it?" -> Lost.

Event Sourcing storage:
  order_events table:
    | id | aggregate_id | event_type           | payload                    | occurred_at         |
    | 1  | 42           | OrderPlaced          | {total: 149.99, items: []} | 2024-08-15 10:00:00 |
    | 2  | 42           | PaymentReceived      | {method: card, ref: abc}   | 2024-08-15 10:01:00 |
    | 3  | 42           | OrderShipped         | {tracking: PKG-001}        | 2024-08-15 14:22:00 |

  "What was the status yesterday at noon?" -> Replay events up to that point.
  "Who changed it?" -> Event payload contains who did what and when.

Domain Events

<?php

declare(strict_types=1);

// Base event interface
interface DomainEventInterface
{
    public function getAggregateId(): int;
    public function getOccurredAt(): \DateTimeImmutable;
    public function getEventType(): string;
    public function toPayload(): array;
    public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static;
}

// Concrete events - each is an immutable record of something that happened
final readonly class OrderPlaced implements DomainEventInterface
{
    public function __construct(
        private int $orderId,
        private float $total,
        private int $customerId,
        private array $items,
        private \DateTimeImmutable $occurredAt,
    ) {}

    public function getAggregateId(): int              { return $this->orderId; }
    public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
    public function getEventType(): string             { return 'OrderPlaced'; }

    public function toPayload(): array
    {
        return [
            'total'       => $this->total,
            'customer_id' => $this->customerId,
            'items'       => $this->items,
        ];
    }

    public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static
    {
        return new static(
            orderId:    $aggregateId,
            total:      $payload['total'],
            customerId: $payload['customer_id'],
            items:      $payload['items'],
            occurredAt: $occurredAt,
        );
    }
}

final readonly class PaymentReceived implements DomainEventInterface
{
    public function __construct(
        private int $orderId,
        private string $paymentMethod,
        private string $transactionRef,
        private \DateTimeImmutable $occurredAt,
    ) {}

    public function getAggregateId(): int              { return $this->orderId; }
    public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
    public function getEventType(): string             { return 'PaymentReceived'; }

    public function toPayload(): array
    {
        return ['method' => $this->paymentMethod, 'ref' => $this->transactionRef];
    }

    public static function fromPayload(int $aggregateId, array $payload, \DateTimeImmutable $occurredAt): static
    {
        return new static($aggregateId, $payload['method'], $payload['ref'], $occurredAt);
    }
}

Aggregate Root – applies events

<?php

declare(strict_types=1);

abstract class AggregateRoot
{
    private array $uncommittedEvents = [];

    protected function recordEvent(DomainEventInterface $event): void
    {
        $this->apply($event);
        $this->uncommittedEvents[] = $event;
    }

    // Apply events in sequence to rebuild state
    abstract protected function apply(DomainEventInterface $event): void;

    public function getUncommittedEvents(): array
    {
        $events = $this->uncommittedEvents;
        $this->uncommittedEvents = [];
        return $events;
    }

    // Reconstitute from event history
    public static function reconstitute(array $events): static
    {
        $instance = new static();
        foreach ($events as $event) {
            $instance->apply($event);
        }
        return $instance;
    }
}

class Order extends AggregateRoot
{
    private int $id;
    private string $status = 'new';
    private float $total   = 0.0;
    private array $items   = [];
    private ?string $trackingNumber = null;

    public static function place(int $id, int $customerId, array $items, float $total): static
    {
        $order = new static();
        $order->recordEvent(new OrderPlaced($id, $total, $customerId, $items, new \DateTimeImmutable()));
        return $order;
    }

    public function receivePayment(string $method, string $ref): void
    {
        if ($this->status !== 'pending_payment') {
            throw new \RuntimeException("Cannot receive payment in status: {$this->status}");
        }
        $this->recordEvent(new PaymentReceived($this->id, $method, $ref, new \DateTimeImmutable()));
    }

    protected function apply(DomainEventInterface $event): void
    {
        match($event->getEventType()) {
            'OrderPlaced'     => $this->applyOrderPlaced($event),
            'PaymentReceived' => $this->applyPaymentReceived($event),
            default           => null,
        };
    }

    private function applyOrderPlaced(OrderPlaced $event): void
    {
        $this->status = 'pending_payment';
        $this->total  = $event->toPayload()['total'];
        $this->items  = $event->toPayload()['items'];
    }

    private function applyPaymentReceived(PaymentReceived $event): void
    {
        $this->status = 'processing';
    }

    public function getStatus(): string { return $this->status; }
    public function getTotal(): float   { return $this->total; }
}

Event Store

<?php

declare(strict_types=1);

class EventStore
{
    private array $eventRegistry;

    public function __construct(
        private \Magento\Framework\App\ResourceConnection $resourceConnection,
        array $eventClasses // ['OrderPlaced' => OrderPlaced::class, ...]
    ) {
        $this->eventRegistry = $eventClasses;
    }

    public function append(AggregateRoot $aggregate): void
    {
        $events     = $aggregate->getUncommittedEvents();
        $connection = $this->resourceConnection->getConnection();
        $table      = $this->resourceConnection->getTableName('vendor_event_store');

        foreach ($events as $event) {
            $connection->insert($table, [
                'aggregate_id' => $event->getAggregateId(),
                'event_type'   => $event->getEventType(),
                'payload'      => json_encode($event->toPayload(), JSON_THROW_ON_ERROR),
                'occurred_at'  => $event->getOccurredAt()->format('Y-m-d H:i:s.u'),
            ]);
        }
    }

    public function load(int $aggregateId): array
    {
        $connection = $this->resourceConnection->getConnection();
        $rows = $connection->fetchAll(
            $connection->select()
                ->from($this->resourceConnection->getTableName('vendor_event_store'))
                ->where('aggregate_id = ?', $aggregateId)
                ->order('id ASC')
        );

        return array_map(function($row) {
            $class = $this->eventRegistry[$row['event_type']]
                ?? throw new \RuntimeException("Unknown event type: {$row['event_type']}");

            return $class::fromPayload(
                (int) $row['aggregate_id'],
                json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR),
                new \DateTimeImmutable($row['occurred_at'])
            );
        }, $rows);
    }
}

// Usage
$eventStore = new EventStore($resourceConnection, [
    'OrderPlaced'     => OrderPlaced::class,
    'PaymentReceived' => PaymentReceived::class,
]);

// Place an order
$order = Order::place(42, 123, [['sku' => 'A', 'qty' => 2]], 149.99);
$eventStore->append($order);

// Later: load and reconstitute from events
$events = $eventStore->load(42);
$order  = Order::reconstitute($events);
echo $order->getStatus(); // 'pending_payment'

$order->receivePayment('card', 'TXN-001');
$eventStore->append($order);

$events = $eventStore->load(42);
$order  = Order::reconstitute($events);
echo $order->getStatus(); // 'processing'

Summary

Event Sourcing trades storage simplicity for a complete, immutable history. Every state change is captured as an event with who, what, and when. The Aggregate Root pattern keeps business rules in domain objects; the Event Store persists everything; reconstitution replays events to rebuild current state. Combined with CQRS, the read side can build optimised projections from the event stream, completely independent of the write side. The complexity is real – justify it with audit requirements, time-travel debugging needs, or complex domain logic.

About Henryk Tews

What you can read next

Flyweight pattern – object sharing, instance cache, Magento 2
Visitor pattern – double dispatch, CSV/PDF export, validation without modifying classes
Iterator and Generator – lazy processing, yield, IteratorAggregate, memory benchmark

© 2026 Created by

TOP
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Always active
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Manage options
  • Manage services
  • Manage {vendor_count} vendors
  • Read more about these purposes
Zobacz preferencje
  • {title}
  • {title}
  • {title}