PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

Redis Streams – Consumer Groups, pending messages, dead letter, integracja z Magento queue

by Henryk Tews / wtorek, 03 grudnia 2024 / Opublikowano w Magento 2

Redis Streams to struktura danych wprowadzona w Redis 5.0 która łączy cechy kolejki wiadomości i niezmiennego logu zdarzeń. W porównaniu do klasycznych list Redis (LPUSH/RPOP) daje persystencję, grupy konsumentów, potwierdzenia odbioru i możliwość odczytu historii. Dla projektów gdzie RabbitMQ to za dużo infrastruktury a Redis już masz – Streams mogą zastąpić brokera wiadomości.

Redis Streams vs inne mechanizmy kolejkowania

Aspekt Redis List (LPUSH/RPOP) Redis Streams RabbitMQ
Persystencja Brak po konsumpcji Tak – historia zachowana Tak
Wielu konsumentów Jeden pobiera, reszta traci Consumer Groups Tak (exchange/routing)
Potwierdzenie odbioru Nie Tak (XACK) Tak (ACK)
Odczyt historyczny Nie Tak (od dowolnego ID) Nie (po konsumpcji)
Złożoność infrastruktury Brak (już masz Redis) Brak (już masz Redis) Osobny serwer

Podstawowe operacje – XADD, XREAD, XACK

<?php

declare(strict_types=1);

$redis = new \Redis();
$redis->connect('localhost', 6379);

// XADD - dodaj wiadomość do streamu
// ID '*' = automatyczny timestamp-based ID
$messageId = $redis->xAdd('order-events', '*', [
    'event_type' => 'order.placed',
    'order_id'   => '1042',
    'customer'   => 'jan@example.com',
    'total'      => '129.99',
    'currency'   => 'PLN',
]);

echo "Dodano wiadomość: {$messageId}\n";
// Dodano wiadomość: 1701234567890-0

// XREAD - odczytaj wiadomości od danego ID
// '$' = tylko nowe (od tej chwili), '0' = od początku
$messages = $redis->xRead(['order-events' => '0'], 10);
// Zwraca: ['order-events' => [[id, data], [id, data], ...]]

foreach ($messages['order-events'] ?? [] as [$id, $data]) {
    echo "ID: {$id}\n";
    echo "Event: {$data['event_type']}\n";
    echo "Order: {$data['order_id']}\n\n";
}

// XLEN - ile wiadomości w streamie
echo "Wiadomości: " . $redis->xLen('order-events') . "\n";

// XRANGE - pobierz zakres wiadomości (po ID lub timestamp)
$range = $redis->xRange('order-events', '-', '+', 100);
// '-' = od początku, '+' = do końca

// XTRIM - ogranicz rozmiar streamu (przydatne żeby nie urósł do nieskończoności)
$redis->xTrim('order-events', 10000); // max 10000 wiadomości
// Lub z MAXLEN ~: przybliżone trim (szybsze)
$redis->rawCommand('XTRIM', 'order-events', 'MAXLEN', '~', '10000');

Consumer Groups – wielu konsumentów, każdy dostaje swoje

<?php

declare(strict_types=1);

$redis = new \Redis();
$redis->connect('localhost', 6379);

// Utwórz grupę konsumentów
// '$' = tylko nowe wiadomości od teraz, '0' = od początku historii
try {
    $redis->xGroup('CREATE', 'order-events', 'email-notifications', '$', true);
    // true = utwórz stream jeśli nie istnieje
} catch (\RedisException $e) {
    // Grupa już istnieje - ignoruj
    if (!str_contains($e->getMessage(), 'BUSYGROUP')) {
        throw $e;
    }
}

// XREADGROUP - odczytaj wiadomości jako member grupy
// '>' = pobierz wiadomości które jeszcze nie były dostarczone
$messages = $redis->xReadGroup(
    'email-notifications',  // nazwa grupy
    'worker-1',             // nazwa tego konsumenta
    ['order-events' => '>'],
    10,                     // max 10 wiadomości
    2000                    // timeout 2s (blokujące czekanie na nowe)
);

foreach ($messages['order-events'] ?? [] as [$id, $data]) {
    try {
        // Przetwórz wiadomość
        $this->sendOrderConfirmationEmail(
            $data['customer'],
            $data['order_id'],
            (float) $data['total']
        );

        // Potwierdź odbiór - XACK
        $redis->xAck('order-events', 'email-notifications', [$id]);
        echo "Processed and ACK'd: {$id}\n";

    } catch (\Exception $e) {
        // Nie ACK - wiadomość pozostanie w pending list
        // i może zostać odebrana przez innego konsumenta po timeout
        echo "Failed to process {$id}: " . $e->getMessage() . "\n";
    }
}

Pending Messages – obsługa niepotwierdzonych

<?php

declare(strict_types=1);

// XPENDING - sprawdź wiadomości które nie zostały ACK'd
$pending = $redis->xPending('order-events', 'email-notifications', '-', '+', 10);

foreach ($pending as $entry) {
    [$id, $consumer, $idleMs, $deliveryCount] = $entry;

    // Jeśli wiadomość czeka > 30 sekund i była dostarczana > 3 razy
    if ($idleMs > 30000 && $deliveryCount > 3) {
        echo "Dead letter: {$id} (consumer: {$consumer}, retries: {$deliveryCount})\n";

        // Przenieś do dead letter stream
        $messages = $redis->xRange('order-events', $id, $id, 1);
        foreach ($messages as [$msgId, $data]) {
            $redis->xAdd('order-events-dead', '*', array_merge($data, [
                'original_id'     => $msgId,
                'failed_consumer' => $consumer,
                'retry_count'     => $deliveryCount,
            ]));
        }

        // ACK oryginał żeby usunąć z pending
        $redis->xAck('order-events', 'email-notifications', [$id]);
    } elseif ($idleMs > 5000) {
        // XCLAIM - przejm wiadomość od zawieszego konsumenta
        $redis->xClaim('order-events', 'email-notifications', 'worker-2', 5000, [$id]);
    }
}

Integracja z Magento 2 – Streams jako Message Queue backend

<?php

declare(strict_types=1);

namespace Vendor\Module\Model\Queue;

use Magento\Framework\MessageQueue\PublisherInterface;

// Publisher który wysyła do Redis Streams zamiast domyślnego MySQL
class RedisStreamPublisher
{
    public function __construct(
        private \Redis $redis,
        private string $streamPrefix = 'magento:queue:'
    ) {}

    public function publish(string $topicName, mixed $data): void
    {
        $streamKey = $this->streamPrefix . str_replace('.', ':', $topicName);

        $payload = [
            'topic'      => $topicName,
            'data'       => json_encode($data, JSON_THROW_ON_ERROR),
            'created_at' => (string) microtime(true),
        ];

        $id = $this->redis->xAdd($streamKey, '*', $payload);

        // Trim do 100k wiadomości żeby stream nie rósł bez końca
        $this->redis->rawCommand('XTRIM', $streamKey, 'MAXLEN', '~', '100000');
    }
}

// Consumer który odbiera z Redis Streams
class RedisStreamConsumer
{
    public function __construct(
        private \Redis $redis,
        private string $streamPrefix = 'magento:queue:'
    ) {}

    public function consume(string $topicName, string $groupName, string $consumerName): void
    {
        $streamKey = $this->streamPrefix . str_replace('.', ':', $topicName);

        // Utwórz grupę jeśli nie istnieje
        try {
            $this->redis->xGroup('CREATE', $streamKey, $groupName, '$', true);
        } catch (\RedisException $e) {
            if (!str_contains($e->getMessage(), 'BUSYGROUP')) throw $e;
        }

        echo "Consumer {$consumerName} nasłuchuje na {$streamKey}...\n";

        while (true) {
            $messages = $this->redis->xReadGroup(
                $groupName, $consumerName,
                [$streamKey => '>'],
                10, 2000 // 10 wiadomości, 2s timeout
            );

            foreach ($messages[$streamKey] ?? [] as [$id, $payload]) {
                try {
                    $data = json_decode($payload['data'], true, 512, JSON_THROW_ON_ERROR);
                    $this->processMessage($payload['topic'], $data);
                    $this->redis->xAck($streamKey, $groupName, [$id]);
                } catch (\Exception $e) {
                    // Loguj, nie ACK - wiadomość wróci do pending
                    error_log("Error processing {$id}: " . $e->getMessage());
                }
            }
        }
    }

    private function processMessage(string $topic, array $data): void
    {
        // Dispatch do odpowiedniego handlera
        echo "Processing: {$topic}\n";
    }
}

Redis Streams w DDEV

# Wejdź do Redis CLI w DDEV i eksperymentuj ze Streams
ddev exec redis-cli

# Dodaj kilka eventów
XADD order-events '*' event order.placed order_id 1001 customer jan@example.com
XADD order-events '*' event order.shipped order_id 1001 tracking PL123456789
XADD order-events '*' event order.placed order_id 1002 customer anna@example.com

# Sprawdź zawartość
XLEN order-events
XRANGE order-events - + COUNT 10

# Utwórz grupę i odczytaj
XGROUP CREATE order-events analytics $ MKSTREAM
XREADGROUP GROUP analytics worker-1 COUNT 10 STREAMS order-events >

# Sprawdź pending
XPENDING order-events analytics - + 10

# Potwierdź odbiór
XACK order-events analytics 1701234567890-0

Podsumowanie

Redis Streams to solidna alternatywa dla RabbitMQ gdy masz już Redis w stosie i nie potrzebujesz zaawansowanego routingu wiadomości. Consumer Groups dają wiele instancji konsumentów z gwarancją że każda wiadomość trafi do dokładnie jednej. Persystencja i historia zdarzeń są bonusem który ułatwia debugging i replay eventów. W Magento 2 Streams mogą zastąpić MySQL queue backend dla modułów które nie wymagają skomplikowanego routingu typowego dla RabbitMQ.

About Henryk Tews

Co możesz przeczytać następne

Migracja danych prod → dev – mydumper, anonimizacja RODO, hooks automatyzujące setup
Jubileusz 100 wpisów – retrospektywa 6,5 roku, PHP 7.2->8.4, Magento 2.2->2.4.8
Hyvä vs Luma benchmark – twarde liczby, LCP 3.5x szybszy, k6 load test, konwersja
  • Publikacje
  • O autorze
  • Kontakt

© 2026 Created by

GÓRA
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Zawsze aktywne
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Zarządzaj opcjami
  • Zarządzaj serwisami
  • Zarządzaj {vendor_count} dostawcami
  • Przeczytaj więcej o tych celach
Zobacz preferencje
  • {title}
  • {title}
  • {title}