Redis Streams to struktura danych wprowadzona w Redis 5.0 która łączy cechy kolejki wiadomości i niezmiennego logu zdarzeń. W porównaniu do klasycznych list Redis (LPUSH/RPOP) daje persystencję, grupy konsumentów, potwierdzenia odbioru i możliwość odczytu historii. Dla projektów gdzie RabbitMQ to za dużo infrastruktury a Redis już masz – Streams mogą zastąpić brokera wiadomości.
Redis Streams vs inne mechanizmy kolejkowania
| Aspekt | Redis List (LPUSH/RPOP) | Redis Streams | RabbitMQ |
|---|---|---|---|
| Persystencja | Brak po konsumpcji | Tak – historia zachowana | Tak |
| Wielu konsumentów | Jeden pobiera, reszta traci | Consumer Groups | Tak (exchange/routing) |
| Potwierdzenie odbioru | Nie | Tak (XACK) | Tak (ACK) |
| Odczyt historyczny | Nie | Tak (od dowolnego ID) | Nie (po konsumpcji) |
| Złożoność infrastruktury | Brak (już masz Redis) | Brak (już masz Redis) | Osobny serwer |
Podstawowe operacje – XADD, XREAD, XACK
<?php
declare(strict_types=1);
$redis = new \Redis();
$redis->connect('localhost', 6379);
// XADD - dodaj wiadomość do streamu
// ID '*' = automatyczny timestamp-based ID
$messageId = $redis->xAdd('order-events', '*', [
'event_type' => 'order.placed',
'order_id' => '1042',
'customer' => 'jan@example.com',
'total' => '129.99',
'currency' => 'PLN',
]);
echo "Dodano wiadomość: {$messageId}\n";
// Dodano wiadomość: 1701234567890-0
// XREAD - odczytaj wiadomości od danego ID
// '$' = tylko nowe (od tej chwili), '0' = od początku
$messages = $redis->xRead(['order-events' => '0'], 10);
// Zwraca: ['order-events' => [[id, data], [id, data], ...]]
foreach ($messages['order-events'] ?? [] as [$id, $data]) {
echo "ID: {$id}\n";
echo "Event: {$data['event_type']}\n";
echo "Order: {$data['order_id']}\n\n";
}
// XLEN - ile wiadomości w streamie
echo "Wiadomości: " . $redis->xLen('order-events') . "\n";
// XRANGE - pobierz zakres wiadomości (po ID lub timestamp)
$range = $redis->xRange('order-events', '-', '+', 100);
// '-' = od początku, '+' = do końca
// XTRIM - ogranicz rozmiar streamu (przydatne żeby nie urósł do nieskończoności)
$redis->xTrim('order-events', 10000); // max 10000 wiadomości
// Lub z MAXLEN ~: przybliżone trim (szybsze)
$redis->rawCommand('XTRIM', 'order-events', 'MAXLEN', '~', '10000');
Consumer Groups – wielu konsumentów, każdy dostaje swoje
<?php
declare(strict_types=1);
$redis = new \Redis();
$redis->connect('localhost', 6379);
// Utwórz grupę konsumentów
// '$' = tylko nowe wiadomości od teraz, '0' = od początku historii
try {
$redis->xGroup('CREATE', 'order-events', 'email-notifications', '$', true);
// true = utwórz stream jeśli nie istnieje
} catch (\RedisException $e) {
// Grupa już istnieje - ignoruj
if (!str_contains($e->getMessage(), 'BUSYGROUP')) {
throw $e;
}
}
// XREADGROUP - odczytaj wiadomości jako member grupy
// '>' = pobierz wiadomości które jeszcze nie były dostarczone
$messages = $redis->xReadGroup(
'email-notifications', // nazwa grupy
'worker-1', // nazwa tego konsumenta
['order-events' => '>'],
10, // max 10 wiadomości
2000 // timeout 2s (blokujące czekanie na nowe)
);
foreach ($messages['order-events'] ?? [] as [$id, $data]) {
try {
// Przetwórz wiadomość
$this->sendOrderConfirmationEmail(
$data['customer'],
$data['order_id'],
(float) $data['total']
);
// Potwierdź odbiór - XACK
$redis->xAck('order-events', 'email-notifications', [$id]);
echo "Processed and ACK'd: {$id}\n";
} catch (\Exception $e) {
// Nie ACK - wiadomość pozostanie w pending list
// i może zostać odebrana przez innego konsumenta po timeout
echo "Failed to process {$id}: " . $e->getMessage() . "\n";
}
}
Pending Messages – obsługa niepotwierdzonych
<?php
declare(strict_types=1);
// XPENDING - sprawdź wiadomości które nie zostały ACK'd
$pending = $redis->xPending('order-events', 'email-notifications', '-', '+', 10);
foreach ($pending as $entry) {
[$id, $consumer, $idleMs, $deliveryCount] = $entry;
// Jeśli wiadomość czeka > 30 sekund i była dostarczana > 3 razy
if ($idleMs > 30000 && $deliveryCount > 3) {
echo "Dead letter: {$id} (consumer: {$consumer}, retries: {$deliveryCount})\n";
// Przenieś do dead letter stream
$messages = $redis->xRange('order-events', $id, $id, 1);
foreach ($messages as [$msgId, $data]) {
$redis->xAdd('order-events-dead', '*', array_merge($data, [
'original_id' => $msgId,
'failed_consumer' => $consumer,
'retry_count' => $deliveryCount,
]));
}
// ACK oryginał żeby usunąć z pending
$redis->xAck('order-events', 'email-notifications', [$id]);
} elseif ($idleMs > 5000) {
// XCLAIM - przejm wiadomość od zawieszego konsumenta
$redis->xClaim('order-events', 'email-notifications', 'worker-2', 5000, [$id]);
}
}
Integracja z Magento 2 – Streams jako Message Queue backend
<?php
declare(strict_types=1);
namespace Vendor\Module\Model\Queue;
use Magento\Framework\MessageQueue\PublisherInterface;
// Publisher który wysyła do Redis Streams zamiast domyślnego MySQL
class RedisStreamPublisher
{
public function __construct(
private \Redis $redis,
private string $streamPrefix = 'magento:queue:'
) {}
public function publish(string $topicName, mixed $data): void
{
$streamKey = $this->streamPrefix . str_replace('.', ':', $topicName);
$payload = [
'topic' => $topicName,
'data' => json_encode($data, JSON_THROW_ON_ERROR),
'created_at' => (string) microtime(true),
];
$id = $this->redis->xAdd($streamKey, '*', $payload);
// Trim do 100k wiadomości żeby stream nie rósł bez końca
$this->redis->rawCommand('XTRIM', $streamKey, 'MAXLEN', '~', '100000');
}
}
// Consumer który odbiera z Redis Streams
class RedisStreamConsumer
{
public function __construct(
private \Redis $redis,
private string $streamPrefix = 'magento:queue:'
) {}
public function consume(string $topicName, string $groupName, string $consumerName): void
{
$streamKey = $this->streamPrefix . str_replace('.', ':', $topicName);
// Utwórz grupę jeśli nie istnieje
try {
$this->redis->xGroup('CREATE', $streamKey, $groupName, '$', true);
} catch (\RedisException $e) {
if (!str_contains($e->getMessage(), 'BUSYGROUP')) throw $e;
}
echo "Consumer {$consumerName} nasłuchuje na {$streamKey}...\n";
while (true) {
$messages = $this->redis->xReadGroup(
$groupName, $consumerName,
[$streamKey => '>'],
10, 2000 // 10 wiadomości, 2s timeout
);
foreach ($messages[$streamKey] ?? [] as [$id, $payload]) {
try {
$data = json_decode($payload['data'], true, 512, JSON_THROW_ON_ERROR);
$this->processMessage($payload['topic'], $data);
$this->redis->xAck($streamKey, $groupName, [$id]);
} catch (\Exception $e) {
// Loguj, nie ACK - wiadomość wróci do pending
error_log("Error processing {$id}: " . $e->getMessage());
}
}
}
}
private function processMessage(string $topic, array $data): void
{
// Dispatch do odpowiedniego handlera
echo "Processing: {$topic}\n";
}
}
Redis Streams w DDEV
# Wejdź do Redis CLI w DDEV i eksperymentuj ze Streams ddev exec redis-cli # Dodaj kilka eventów XADD order-events '*' event order.placed order_id 1001 customer jan@example.com XADD order-events '*' event order.shipped order_id 1001 tracking PL123456789 XADD order-events '*' event order.placed order_id 1002 customer anna@example.com # Sprawdź zawartość XLEN order-events XRANGE order-events - + COUNT 10 # Utwórz grupę i odczytaj XGROUP CREATE order-events analytics $ MKSTREAM XREADGROUP GROUP analytics worker-1 COUNT 10 STREAMS order-events > # Sprawdź pending XPENDING order-events analytics - + 10 # Potwierdź odbiór XACK order-events analytics 1701234567890-0
Podsumowanie
Redis Streams to solidna alternatywa dla RabbitMQ gdy masz już Redis w stosie i nie potrzebujesz zaawansowanego routingu wiadomości. Consumer Groups dają wiele instancji konsumentów z gwarancją że każda wiadomość trafi do dokładnie jednej. Persystencja i historia zdarzeń są bonusem który ułatwia debugging i replay eventów. W Magento 2 Streams mogą zastąpić MySQL queue backend dla modułów które nie wymagają skomplikowanego routingu typowego dla RabbitMQ.
