PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

Event Sourcing – Domain Events, Aggregate Root, Event Store, połączenie z CQRS

by Henryk Tews / wtorek, 06 sierpnia 2024 / Opublikowano w Wzorce projektowe

Event Sourcing to wzorzec architektoniczny który zmienia sposób myślenia o persystencji: zamiast zapisywać aktualny stan obiektu, zapisujesz sekwencję zdarzeń które do tego stanu doprowadziły. Stan jest wynikiem odtwarzania tych zdarzeń. Brzmi skomplikowanie, ale rozwiązuje realne problemy z audytem, debugowaniem i odtwarzaniem historii biznesowej.

Dlaczego Event Sourcing?

Klasyczny model: tabela orders z kolumną status = 'cancelled'. Wiesz gdzie jest zamówienie, ale nie wiesz dlaczego zostało anulowane, kto to zrobił i po jakiej sekwencji zdarzeń. Event Sourcing zapisuje odpowiedź na wszystkie te pytania przez przechowywanie sekwencji zdarzeń zamiast aktualnego stanu.

Domain Events – niezmienne fakty

<?php

declare(strict_types=1);

// Bazowy interfejs zdarzenia domenowego
interface DomainEventInterface
{
    public function getAggregateId(): string;
    public function getOccurredAt(): \DateTimeImmutable;
    public function toArray(): array;
}

// Bazowa klasa - immutable, opisuje fakt który już zaszedł
abstract class AbstractDomainEvent implements DomainEventInterface
{
    private readonly \DateTimeImmutable $occurredAt;

    public function __construct(
        private readonly string $aggregateId
    ) {
        $this->occurredAt = new \DateTimeImmutable();
    }

    public function getAggregateId(): string         { return $this->aggregateId; }
    public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
}

// Konkretne zdarzenia - każde opisuje jeden fakt
final class OrderPlaced extends AbstractDomainEvent
{
    public function __construct(
        string $orderId,
        public readonly int $customerId,
        public readonly array $items,
        public readonly float $subtotal
    ) {
        parent::__construct($orderId);
    }

    public function toArray(): array
    {
        return [
            'customer_id' => $this->customerId,
            'items'       => $this->items,
            'subtotal'    => $this->subtotal,
        ];
    }
}

final class ItemAdded extends AbstractDomainEvent
{
    public function __construct(
        string $orderId,
        public readonly string $sku,
        public readonly float $price,
        public readonly int $qty
    ) {
        parent::__construct($orderId);
    }

    public function toArray(): array
    {
        return ['sku' => $this->sku, 'price' => $this->price, 'qty' => $this->qty];
    }
}

final class CouponApplied extends AbstractDomainEvent
{
    public function __construct(
        string $orderId,
        public readonly string $couponCode,
        public readonly float $discountAmount
    ) {
        parent::__construct($orderId);
    }

    public function toArray(): array
    {
        return ['code' => $this->couponCode, 'discount' => $this->discountAmount];
    }
}

final class OrderCancelled extends AbstractDomainEvent
{
    public function __construct(
        string $orderId,
        public readonly string $reason,
        public readonly int $cancelledByUserId
    ) {
        parent::__construct($orderId);
    }

    public function toArray(): array
    {
        return ['reason' => $this->reason, 'by_user_id' => $this->cancelledByUserId];
    }
}

Aggregate Root – odtwarza stan z zdarzeń

<?php

declare(strict_types=1);

class Order
{
    private string $id;
    private string $status = 'draft';
    private float $total = 0.0;
    private array $items = [];
    private ?string $couponCode = null;

    /** @var DomainEventInterface[] */
    private array $recordedEvents = [];

    private function __construct() {}

    // Rekonstrukcja ze zdarzeń (replay)
    public static function reconstruct(array $events): static
    {
        $order = new static();

        foreach ($events as $event) {
            $order->apply($event, false); // false = nie rejestruj ponownie
        }

        return $order;
    }

    // Factory method - tworzy nowe zamówienie i rejestruje zdarzenie
    public static function place(string $orderId, int $customerId, array $items): static
    {
        $order = new static();
        $order->id = $orderId;

        $subtotal = array_sum(array_map(fn($i) => $i['price'] * $i['qty'], $items));

        $event = new OrderPlaced($orderId, $customerId, $items, $subtotal);
        $order->apply($event, true);

        return $order;
    }

    public function addItem(string $sku, float $price, int $qty): void
    {
        if ($this->status !== 'pending') {
            throw new \LogicException("Cannot add items to order in status: {$this->status}");
        }

        $this->apply(new ItemAdded($this->id, $sku, $price, $qty), true);
    }

    public function applyCoupon(string $code, float $discount): void
    {
        if ($this->couponCode !== null) {
            throw new \LogicException('Coupon already applied');
        }

        $this->apply(new CouponApplied($this->id, $code, $discount), true);
    }

    public function cancel(string $reason, int $userId): void
    {
        if (in_array($this->status, ['shipped', 'delivered'], true)) {
            throw new \LogicException("Cannot cancel order in status: {$this->status}");
        }

        $this->apply(new OrderCancelled($this->id, $reason, $userId), true);
    }

    // Stosuje zdarzenie do stanu - wywoływane zarówno przy new jak i przy replay
    private function apply(DomainEventInterface $event, bool $record): void
    {
        match(true) {
            $event instanceof OrderPlaced   => $this->applyOrderPlaced($event),
            $event instanceof ItemAdded     => $this->applyItemAdded($event),
            $event instanceof CouponApplied => $this->applyCouponApplied($event),
            $event instanceof OrderCancelled => $this->applyOrderCancelled($event),
            default => throw new \RuntimeException('Unknown event: ' . get_class($event)),
        };

        if ($record) {
            $this->recordedEvents[] = $event;
        }
    }

    private function applyOrderPlaced(OrderPlaced $event): void
    {
        $this->id     = $event->getAggregateId();
        $this->items  = $event->items;
        $this->total  = $event->subtotal;
        $this->status = 'pending';
    }

    private function applyItemAdded(ItemAdded $event): void
    {
        $this->items[] = ['sku' => $event->sku, 'price' => $event->price, 'qty' => $event->qty];
        $this->total  += $event->price * $event->qty;
    }

    private function applyCouponApplied(CouponApplied $event): void
    {
        $this->couponCode = $event->couponCode;
        $this->total     -= $event->discountAmount;
    }

    private function applyOrderCancelled(OrderCancelled $event): void
    {
        $this->status = 'cancelled';
    }

    public function getRecordedEvents(): array { return $this->recordedEvents; }
    public function clearRecordedEvents(): void { $this->recordedEvents = []; }
    public function getId(): string             { return $this->id; }
    public function getStatus(): string         { return $this->status; }
    public function getTotal(): float           { return $this->total; }
}

Event Store – persystencja zdarzeń

<?php

declare(strict_types=1);

interface EventStoreInterface
{
    public function append(string $aggregateId, array $events, int $expectedVersion): void;
    public function getEvents(string $aggregateId): array;
}

class SqlEventStore implements EventStoreInterface
{
    public function __construct(private \PDO $pdo) {}

    public function append(string $aggregateId, array $events, int $expectedVersion): void
    {
        $this->pdo->beginTransaction();

        try {
            // Optimistic locking - sprawdź wersję
            $currentVersion = $this->getCurrentVersion($aggregateId);

            if ($currentVersion !== $expectedVersion) {
                throw new \RuntimeException(
                    "Concurrency conflict: expected version {$expectedVersion}, got {$currentVersion}"
                );
            }

            $stmt = $this->pdo->prepare('
                INSERT INTO event_store
                    (aggregate_id, aggregate_type, event_type, payload, version, occurred_at)
                VALUES
                    (:agg_id, :agg_type, :event_type, :payload, :version, :occurred_at)
            ');

            foreach ($events as $index => $event) {
                $stmt->execute([
                    ':agg_id'     => $aggregateId,
                    ':agg_type'   => 'Order',
                    ':event_type' => get_class($event),
                    ':payload'    => json_encode($event->toArray(), JSON_THROW_ON_ERROR),
                    ':version'    => $expectedVersion + $index + 1,
                    ':occurred_at' => $event->getOccurredAt()->format('Y-m-d H:i:s.u'),
                ]);
            }

            $this->pdo->commit();
        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }

    public function getEvents(string $aggregateId): array
    {
        $stmt = $this->pdo->prepare('
            SELECT event_type, payload, occurred_at, version
            FROM event_store
            WHERE aggregate_id = :id
            ORDER BY version ASC
        ');

        $stmt->execute([':id' => $aggregateId]);
        $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC);

        return array_map(function(array $row) {
            $payload = json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR);
            return $this->deserializeEvent($row['event_type'], $payload, $row['aggregate_id'] ?? '');
        }, $rows);
    }

    private function deserializeEvent(string $type, array $payload, string $aggregateId): DomainEventInterface
    {
        return match($type) {
            OrderPlaced::class   => new OrderPlaced($aggregateId, $payload['customer_id'], $payload['items'], $payload['subtotal']),
            ItemAdded::class     => new ItemAdded($aggregateId, $payload['sku'], $payload['price'], $payload['qty']),
            CouponApplied::class => new CouponApplied($aggregateId, $payload['code'], $payload['discount']),
            OrderCancelled::class => new OrderCancelled($aggregateId, $payload['reason'], $payload['by_user_id']),
            default => throw new \RuntimeException("Unknown event type: {$type}"),
        };
    }

    private function getCurrentVersion(string $aggregateId): int
    {
        $stmt = $this->pdo->prepare('
            SELECT MAX(version) FROM event_store WHERE aggregate_id = :id
        ');
        $stmt->execute([':id' => $aggregateId]);
        return (int) ($stmt->fetchColumn() ?? 0);
    }
}

Repository – łączy Aggregate z Event Store

<?php

declare(strict_types=1);

class OrderRepository
{
    public function __construct(private EventStoreInterface $eventStore) {}

    public function save(Order $order): void
    {
        $events  = $order->getRecordedEvents();
        $version = $this->eventStore->getCurrentVersion($order->getId()) ?? 0; // uproszczone

        if (!empty($events)) {
            $this->eventStore->append($order->getId(), $events, $version);
            $order->clearRecordedEvents();
        }
    }

    public function getById(string $orderId): Order
    {
        $events = $this->eventStore->getEvents($orderId);

        if (empty($events)) {
            throw new \RuntimeException("Order not found: {$orderId}");
        }

        return Order::reconstruct($events);
    }
}

// Użycie
$repo = new OrderRepository(new SqlEventStore($pdo));

// Nowe zamówienie
$order = Order::place('ORD-001', 42, [['sku' => 'SKU-A', 'price' => 29.99, 'qty' => 2]]);
$order->applyCoupon('SAVE10', 5.99);
$repo->save($order);

// Odczyt - replay z zdarzeń
$loaded = $repo->getById('ORD-001');
echo $loaded->getStatus(); // pending
echo $loaded->getTotal();  // 53.99

// Historia jest niemodyfikowalna - możesz zawsze zobaczyć każdy krok
$events = $eventStore->getEvents('ORD-001');
foreach ($events as $event) {
    echo get_class($event) . ' @ ' . $event->getOccurredAt()->format('H:i:s') . "\n";
}

Kiedy Event Sourcing ma sens?

Event Sourcing dodaje złożoność – jest uzasadniony gdy:

  • Historia zmian jest wartością biznesową (audyt, compliance, analityka)
  • Chcesz odtwarzać stan systemu w dowolnym momencie w przeszłości
  • Masz wiele czytelników (projections) z różnymi potrzebami odczytu
  • Potrzebujesz event-driven integracji z innymi systemami

Dla prostego CRUD-a – to over-engineering. Dla systemu zamówień gdzie compliance wymaga pełnej historii – może być właściwym wyborem.

Podsumowanie

Event Sourcing to zmiana paradygmatu: persystujesz fakty, nie stan. Stan jest zawsze wynikiem odtworzenia sekwencji faktów. Zyski: pełna historia zmian, możliwość odtwarzania stanu historycznego, naturalne wsparcie dla auditlogu. Koszty: złożoność implementacji, konieczność zarządzania schematem zdarzeń przy ewolucji modelu, wydajność przy długich strumieniach zdarzeń (snapshoting). Dobrze łączy się z CQRS z poprzedniego wpisu – strumień zdarzeń jako source of truth, projekcje jako read models.

About Henryk Tews

Co możesz przeczytać następne

Factory Method + Abstract Factory – implementacje od zera, tabela różnic, Simple Factory jako alternatywa
Wzorzec Observer w PHP i system zdarzeń Magento 2
Wzorzec Proxy – lazy loading, kontrola dostępu, caching, Proxy w Magento 2
  • Publikacje
  • O autorze
  • Kontakt

© 2026 Created by

GÓRA
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Zawsze aktywne
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Zarządzaj opcjami
  • Zarządzaj serwisami
  • Zarządzaj {vendor_count} dostawcami
  • Przeczytaj więcej o tych celach
Zobacz preferencje
  • {title}
  • {title}
  • {title}