Event Sourcing to wzorzec architektoniczny który zmienia sposób myślenia o persystencji: zamiast zapisywać aktualny stan obiektu, zapisujesz sekwencję zdarzeń które do tego stanu doprowadziły. Stan jest wynikiem odtwarzania tych zdarzeń. Brzmi skomplikowanie, ale rozwiązuje realne problemy z audytem, debugowaniem i odtwarzaniem historii biznesowej.
Dlaczego Event Sourcing?
Klasyczny model: tabela orders z kolumną status = 'cancelled'. Wiesz gdzie jest zamówienie, ale nie wiesz dlaczego zostało anulowane, kto to zrobił i po jakiej sekwencji zdarzeń. Event Sourcing zapisuje odpowiedź na wszystkie te pytania przez przechowywanie sekwencji zdarzeń zamiast aktualnego stanu.
Domain Events – niezmienne fakty
<?php
declare(strict_types=1);
// Bazowy interfejs zdarzenia domenowego
interface DomainEventInterface
{
public function getAggregateId(): string;
public function getOccurredAt(): \DateTimeImmutable;
public function toArray(): array;
}
// Bazowa klasa - immutable, opisuje fakt który już zaszedł
abstract class AbstractDomainEvent implements DomainEventInterface
{
private readonly \DateTimeImmutable $occurredAt;
public function __construct(
private readonly string $aggregateId
) {
$this->occurredAt = new \DateTimeImmutable();
}
public function getAggregateId(): string { return $this->aggregateId; }
public function getOccurredAt(): \DateTimeImmutable { return $this->occurredAt; }
}
// Konkretne zdarzenia - każde opisuje jeden fakt
final class OrderPlaced extends AbstractDomainEvent
{
public function __construct(
string $orderId,
public readonly int $customerId,
public readonly array $items,
public readonly float $subtotal
) {
parent::__construct($orderId);
}
public function toArray(): array
{
return [
'customer_id' => $this->customerId,
'items' => $this->items,
'subtotal' => $this->subtotal,
];
}
}
final class ItemAdded extends AbstractDomainEvent
{
public function __construct(
string $orderId,
public readonly string $sku,
public readonly float $price,
public readonly int $qty
) {
parent::__construct($orderId);
}
public function toArray(): array
{
return ['sku' => $this->sku, 'price' => $this->price, 'qty' => $this->qty];
}
}
final class CouponApplied extends AbstractDomainEvent
{
public function __construct(
string $orderId,
public readonly string $couponCode,
public readonly float $discountAmount
) {
parent::__construct($orderId);
}
public function toArray(): array
{
return ['code' => $this->couponCode, 'discount' => $this->discountAmount];
}
}
final class OrderCancelled extends AbstractDomainEvent
{
public function __construct(
string $orderId,
public readonly string $reason,
public readonly int $cancelledByUserId
) {
parent::__construct($orderId);
}
public function toArray(): array
{
return ['reason' => $this->reason, 'by_user_id' => $this->cancelledByUserId];
}
}
Aggregate Root – odtwarza stan z zdarzeń
<?php
declare(strict_types=1);
class Order
{
private string $id;
private string $status = 'draft';
private float $total = 0.0;
private array $items = [];
private ?string $couponCode = null;
/** @var DomainEventInterface[] */
private array $recordedEvents = [];
private function __construct() {}
// Rekonstrukcja ze zdarzeń (replay)
public static function reconstruct(array $events): static
{
$order = new static();
foreach ($events as $event) {
$order->apply($event, false); // false = nie rejestruj ponownie
}
return $order;
}
// Factory method - tworzy nowe zamówienie i rejestruje zdarzenie
public static function place(string $orderId, int $customerId, array $items): static
{
$order = new static();
$order->id = $orderId;
$subtotal = array_sum(array_map(fn($i) => $i['price'] * $i['qty'], $items));
$event = new OrderPlaced($orderId, $customerId, $items, $subtotal);
$order->apply($event, true);
return $order;
}
public function addItem(string $sku, float $price, int $qty): void
{
if ($this->status !== 'pending') {
throw new \LogicException("Cannot add items to order in status: {$this->status}");
}
$this->apply(new ItemAdded($this->id, $sku, $price, $qty), true);
}
public function applyCoupon(string $code, float $discount): void
{
if ($this->couponCode !== null) {
throw new \LogicException('Coupon already applied');
}
$this->apply(new CouponApplied($this->id, $code, $discount), true);
}
public function cancel(string $reason, int $userId): void
{
if (in_array($this->status, ['shipped', 'delivered'], true)) {
throw new \LogicException("Cannot cancel order in status: {$this->status}");
}
$this->apply(new OrderCancelled($this->id, $reason, $userId), true);
}
// Stosuje zdarzenie do stanu - wywoływane zarówno przy new jak i przy replay
private function apply(DomainEventInterface $event, bool $record): void
{
match(true) {
$event instanceof OrderPlaced => $this->applyOrderPlaced($event),
$event instanceof ItemAdded => $this->applyItemAdded($event),
$event instanceof CouponApplied => $this->applyCouponApplied($event),
$event instanceof OrderCancelled => $this->applyOrderCancelled($event),
default => throw new \RuntimeException('Unknown event: ' . get_class($event)),
};
if ($record) {
$this->recordedEvents[] = $event;
}
}
private function applyOrderPlaced(OrderPlaced $event): void
{
$this->id = $event->getAggregateId();
$this->items = $event->items;
$this->total = $event->subtotal;
$this->status = 'pending';
}
private function applyItemAdded(ItemAdded $event): void
{
$this->items[] = ['sku' => $event->sku, 'price' => $event->price, 'qty' => $event->qty];
$this->total += $event->price * $event->qty;
}
private function applyCouponApplied(CouponApplied $event): void
{
$this->couponCode = $event->couponCode;
$this->total -= $event->discountAmount;
}
private function applyOrderCancelled(OrderCancelled $event): void
{
$this->status = 'cancelled';
}
public function getRecordedEvents(): array { return $this->recordedEvents; }
public function clearRecordedEvents(): void { $this->recordedEvents = []; }
public function getId(): string { return $this->id; }
public function getStatus(): string { return $this->status; }
public function getTotal(): float { return $this->total; }
}
Event Store – persystencja zdarzeń
<?php
declare(strict_types=1);
interface EventStoreInterface
{
public function append(string $aggregateId, array $events, int $expectedVersion): void;
public function getEvents(string $aggregateId): array;
}
class SqlEventStore implements EventStoreInterface
{
public function __construct(private \PDO $pdo) {}
public function append(string $aggregateId, array $events, int $expectedVersion): void
{
$this->pdo->beginTransaction();
try {
// Optimistic locking - sprawdź wersję
$currentVersion = $this->getCurrentVersion($aggregateId);
if ($currentVersion !== $expectedVersion) {
throw new \RuntimeException(
"Concurrency conflict: expected version {$expectedVersion}, got {$currentVersion}"
);
}
$stmt = $this->pdo->prepare('
INSERT INTO event_store
(aggregate_id, aggregate_type, event_type, payload, version, occurred_at)
VALUES
(:agg_id, :agg_type, :event_type, :payload, :version, :occurred_at)
');
foreach ($events as $index => $event) {
$stmt->execute([
':agg_id' => $aggregateId,
':agg_type' => 'Order',
':event_type' => get_class($event),
':payload' => json_encode($event->toArray(), JSON_THROW_ON_ERROR),
':version' => $expectedVersion + $index + 1,
':occurred_at' => $event->getOccurredAt()->format('Y-m-d H:i:s.u'),
]);
}
$this->pdo->commit();
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
public function getEvents(string $aggregateId): array
{
$stmt = $this->pdo->prepare('
SELECT event_type, payload, occurred_at, version
FROM event_store
WHERE aggregate_id = :id
ORDER BY version ASC
');
$stmt->execute([':id' => $aggregateId]);
$rows = $stmt->fetchAll(\PDO::FETCH_ASSOC);
return array_map(function(array $row) {
$payload = json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR);
return $this->deserializeEvent($row['event_type'], $payload, $row['aggregate_id'] ?? '');
}, $rows);
}
private function deserializeEvent(string $type, array $payload, string $aggregateId): DomainEventInterface
{
return match($type) {
OrderPlaced::class => new OrderPlaced($aggregateId, $payload['customer_id'], $payload['items'], $payload['subtotal']),
ItemAdded::class => new ItemAdded($aggregateId, $payload['sku'], $payload['price'], $payload['qty']),
CouponApplied::class => new CouponApplied($aggregateId, $payload['code'], $payload['discount']),
OrderCancelled::class => new OrderCancelled($aggregateId, $payload['reason'], $payload['by_user_id']),
default => throw new \RuntimeException("Unknown event type: {$type}"),
};
}
private function getCurrentVersion(string $aggregateId): int
{
$stmt = $this->pdo->prepare('
SELECT MAX(version) FROM event_store WHERE aggregate_id = :id
');
$stmt->execute([':id' => $aggregateId]);
return (int) ($stmt->fetchColumn() ?? 0);
}
}
Repository – łączy Aggregate z Event Store
<?php
declare(strict_types=1);
class OrderRepository
{
public function __construct(private EventStoreInterface $eventStore) {}
public function save(Order $order): void
{
$events = $order->getRecordedEvents();
$version = $this->eventStore->getCurrentVersion($order->getId()) ?? 0; // uproszczone
if (!empty($events)) {
$this->eventStore->append($order->getId(), $events, $version);
$order->clearRecordedEvents();
}
}
public function getById(string $orderId): Order
{
$events = $this->eventStore->getEvents($orderId);
if (empty($events)) {
throw new \RuntimeException("Order not found: {$orderId}");
}
return Order::reconstruct($events);
}
}
// Użycie
$repo = new OrderRepository(new SqlEventStore($pdo));
// Nowe zamówienie
$order = Order::place('ORD-001', 42, [['sku' => 'SKU-A', 'price' => 29.99, 'qty' => 2]]);
$order->applyCoupon('SAVE10', 5.99);
$repo->save($order);
// Odczyt - replay z zdarzeń
$loaded = $repo->getById('ORD-001');
echo $loaded->getStatus(); // pending
echo $loaded->getTotal(); // 53.99
// Historia jest niemodyfikowalna - możesz zawsze zobaczyć każdy krok
$events = $eventStore->getEvents('ORD-001');
foreach ($events as $event) {
echo get_class($event) . ' @ ' . $event->getOccurredAt()->format('H:i:s') . "\n";
}
Kiedy Event Sourcing ma sens?
Event Sourcing dodaje złożoność – jest uzasadniony gdy:
- Historia zmian jest wartością biznesową (audyt, compliance, analityka)
- Chcesz odtwarzać stan systemu w dowolnym momencie w przeszłości
- Masz wiele czytelników (projections) z różnymi potrzebami odczytu
- Potrzebujesz event-driven integracji z innymi systemami
Dla prostego CRUD-a – to over-engineering. Dla systemu zamówień gdzie compliance wymaga pełnej historii – może być właściwym wyborem.
Podsumowanie
Event Sourcing to zmiana paradygmatu: persystujesz fakty, nie stan. Stan jest zawsze wynikiem odtworzenia sekwencji faktów. Zyski: pełna historia zmian, możliwość odtwarzania stanu historycznego, naturalne wsparcie dla auditlogu. Koszty: złożoność implementacji, konieczność zarządzania schematem zdarzeń przy ewolucji modelu, wydajność przy długich strumieniach zdarzeń (snapshoting). Dobrze łączy się z CQRS z poprzedniego wpisu – strumień zdarzeń jako source of truth, projekcje jako read models.
