PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

Redis Streams – Consumer Groups, pending messages, dead letter, Magento queue integration

by Henryk Tews / Tuesday, 03 December 2024 / Published in Magento 2

Redis Streams is a data structure added in Redis 5.0 that combines the simplicity of a log with consumer groups and acknowledgement semantics. For Magento 2 it offers a robust alternative to RabbitMQ for message queuing: persistent messages, consumer groups, pending message redelivery, and dead letter handling – without the operational overhead of a dedicated AMQP broker.

Redis Streams vs RabbitMQ vs Redis Lists

Feature Redis Lists Redis Streams RabbitMQ
Message persistence Optional (AOF/RDB) Yes Yes
Consumer groups No Yes Yes
Acknowledgement No Yes (XACK) Yes
Pending redelivery No Yes (XCLAIM) Yes
Message replay No Yes (seek to ID) No
Dead letter Manual Manual (check pending) Built-in
Operational cost Zero (Redis already running) Zero (Redis already running) Separate service

Redis Streams basics

# Add a message to a stream
XADD order:export * order_id 42 customer_id 123 total 149.99

# Create a consumer group
XGROUP CREATE order:export exporters $ MKSTREAM

# Read as consumer (blocking, wait for new messages)
XREADGROUP GROUP exporters worker-1 COUNT 10 BLOCK 5000 STREAMS order:export >

# Acknowledge processed message
XACK order:export exporters 1704067200000-0

# Check pending messages (not yet acknowledged)
XPENDING order:export exporters - + 10

PHP implementation – producer

<?php

declare(strict_types=1);

namespace Vendor\Module\Model\Queue;

use Predis\Client as RedisClient;

class StreamProducer
{
    private const STREAM_KEY = 'magento:order:export';
    private const MAX_LENGTH = 10000; // cap stream length

    public function __construct(private RedisClient $redis) {}

    public function publish(array $payload): string
    {
        // XADD with MAXLEN to prevent unbounded stream growth
        $messageId = $this->redis->xadd(
            self::STREAM_KEY,
            ['MAXLEN', '~', self::MAX_LENGTH],
            '*',
            $this->serializePayload($payload)
        );

        return $messageId;
    }

    private function serializePayload(array $payload): array
    {
        $flat = [];
        foreach ($payload as $key => $value) {
            $flat[$key] = is_array($value) ? json_encode($value) : (string) $value;
        }
        return $flat;
    }
}

// Usage in a Magento observer
class OrderPlacedObserver implements \Magento\Framework\Event\ObserverInterface
{
    public function __construct(private StreamProducer $producer) {}

    public function execute(\Magento\Framework\Event\Observer $observer): void
    {
        $order = $observer->getData('order');
        $this->producer->publish([
            'order_id'    => $order->getId(),
            'increment_id'=> $order->getIncrementId(),
            'total'       => $order->getGrandTotal(),
            'customer_id' => $order->getCustomerId(),
        ]);
    }
}

PHP implementation – consumer with acknowledgement

<?php

declare(strict_types=1);

namespace Vendor\Module\Model\Queue;

use Predis\Client as RedisClient;

class StreamConsumer
{
    private const STREAM_KEY    = 'magento:order:export';
    private const GROUP_NAME    = 'order-exporters';
    private const CONSUMER_NAME = 'worker-1';
    private const BATCH_SIZE    = 10;
    private const BLOCK_MS      = 5000;
    private const MAX_RETRIES   = 3;
    private const DEAD_LETTER   = 'magento:order:export:dead';

    public function __construct(
        private RedisClient $redis,
        private \Vendor\Module\Model\ErpExporter $erpExporter,
        private \Psr\Log\LoggerInterface $logger
    ) {}

    public function run(): void
    {
        $this->ensureGroupExists();

        while (true) {
            // First: check for pending messages (previously unacknowledged)
            $this->processPending();

            // Then: read new messages
            $messages = $this->redis->xreadgroup(
                'GROUP', self::GROUP_NAME, self::CONSUMER_NAME,
                'COUNT', self::BATCH_SIZE,
                'BLOCK', self::BLOCK_MS,
                'STREAMS', self::STREAM_KEY, '>'
            );

            if (empty($messages)) continue;

            foreach ($messages[self::STREAM_KEY] as [$messageId, $fields]) {
                $this->processMessage($messageId, $fields);
            }
        }
    }

    private function processMessage(string $messageId, array $fields): void
    {
        try {
            $payload = $this->deserializePayload($fields);
            $this->erpExporter->export($payload['order_id']);

            // Acknowledge success
            $this->redis->xack(self::STREAM_KEY, self::GROUP_NAME, $messageId);
            $this->logger->info('Exported order', ['id' => $payload['order_id']]);

        } catch (\Exception $e) {
            $this->logger->error('Export failed', [
                'message_id' => $messageId,
                'error'      => $e->getMessage(),
            ]);
            // No XACK = message stays in pending list for retry
        }
    }

    private function processPending(): void
    {
        $pending = $this->redis->xpending(
            self::STREAM_KEY, self::GROUP_NAME, '-', '+', self::BATCH_SIZE
        );

        foreach ($pending as $entry) {
            [$messageId, $consumer, $idleMs, $deliveryCount] = $entry;

            if ($deliveryCount >= self::MAX_RETRIES) {
                // Move to dead letter stream
                $message = $this->redis->xrange(self::STREAM_KEY, $messageId, $messageId);
                if (!empty($message)) {
                    $this->redis->xadd(self::DEAD_LETTER, '*', $message[0][1]);
                    $this->redis->xack(self::STREAM_KEY, self::GROUP_NAME, $messageId);
                    $this->logger->error('Message moved to dead letter', ['id' => $messageId]);
                }
                continue;
            }

            // Claim and retry
            $this->redis->xclaim(
                self::STREAM_KEY, self::GROUP_NAME, self::CONSUMER_NAME,
                0, $messageId
            );
        }
    }

    private function ensureGroupExists(): void
    {
        try {
            $this->redis->xgroup('CREATE', self::STREAM_KEY, self::GROUP_NAME, '0', 'MKSTREAM');
        } catch (\Exception $e) {
            // Group already exists - that's fine
        }
    }

    private function deserializePayload(array $fields): array
    {
        $payload = [];
        foreach ($fields as $key => $value) {
            $decoded = json_decode($value, true);
            $payload[$key] = ($decoded !== null) ? $decoded : $value;
        }
        return $payload;
    }
}

Summary

Redis Streams gives you durable message queuing without adding a new service to your stack – if you already run Redis for cache and sessions, Streams costs nothing extra. Consumer groups, acknowledgement, and pending message redelivery cover the core requirements of reliable message processing. The dead letter pattern (move to separate stream after N retries) completes the picture. For shops that do not need RabbitMQ’s routing complexity but want more reliability than Redis Lists, Streams is the right middle ground.

About Henryk Tews

What you can read next

Strategy pattern in PHP – and how Magento 2 uses it in pricing
Xdebug – configuration, PHPStorm, debugging Magento plugins

© 2026 Created by

TOP
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Always active
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Manage options
  • Manage services
  • Manage {vendor_count} vendors
  • Read more about these purposes
Zobacz preferencje
  • {title}
  • {title}
  • {title}