Redis Streams is a data structure added in Redis 5.0 that combines the simplicity of a log with consumer groups and acknowledgement semantics. For Magento 2 it offers a robust alternative to RabbitMQ for message queuing: persistent messages, consumer groups, pending message redelivery, and dead letter handling – without the operational overhead of a dedicated AMQP broker.
Redis Streams vs RabbitMQ vs Redis Lists
| Feature | Redis Lists | Redis Streams | RabbitMQ |
|---|---|---|---|
| Message persistence | Optional (AOF/RDB) | Yes | Yes |
| Consumer groups | No | Yes | Yes |
| Acknowledgement | No | Yes (XACK) | Yes |
| Pending redelivery | No | Yes (XCLAIM) | Yes |
| Message replay | No | Yes (seek to ID) | No |
| Dead letter | Manual | Manual (check pending) | Built-in |
| Operational cost | Zero (Redis already running) | Zero (Redis already running) | Separate service |
Redis Streams basics
# Add a message to a stream XADD order:export * order_id 42 customer_id 123 total 149.99 # Create a consumer group XGROUP CREATE order:export exporters $ MKSTREAM # Read as consumer (blocking, wait for new messages) XREADGROUP GROUP exporters worker-1 COUNT 10 BLOCK 5000 STREAMS order:export > # Acknowledge processed message XACK order:export exporters 1704067200000-0 # Check pending messages (not yet acknowledged) XPENDING order:export exporters - + 10
PHP implementation – producer
<?php
declare(strict_types=1);
namespace Vendor\Module\Model\Queue;
use Predis\Client as RedisClient;
class StreamProducer
{
private const STREAM_KEY = 'magento:order:export';
private const MAX_LENGTH = 10000; // cap stream length
public function __construct(private RedisClient $redis) {}
public function publish(array $payload): string
{
// XADD with MAXLEN to prevent unbounded stream growth
$messageId = $this->redis->xadd(
self::STREAM_KEY,
['MAXLEN', '~', self::MAX_LENGTH],
'*',
$this->serializePayload($payload)
);
return $messageId;
}
private function serializePayload(array $payload): array
{
$flat = [];
foreach ($payload as $key => $value) {
$flat[$key] = is_array($value) ? json_encode($value) : (string) $value;
}
return $flat;
}
}
// Usage in a Magento observer
class OrderPlacedObserver implements \Magento\Framework\Event\ObserverInterface
{
public function __construct(private StreamProducer $producer) {}
public function execute(\Magento\Framework\Event\Observer $observer): void
{
$order = $observer->getData('order');
$this->producer->publish([
'order_id' => $order->getId(),
'increment_id'=> $order->getIncrementId(),
'total' => $order->getGrandTotal(),
'customer_id' => $order->getCustomerId(),
]);
}
}
PHP implementation – consumer with acknowledgement
<?php
declare(strict_types=1);
namespace Vendor\Module\Model\Queue;
use Predis\Client as RedisClient;
class StreamConsumer
{
private const STREAM_KEY = 'magento:order:export';
private const GROUP_NAME = 'order-exporters';
private const CONSUMER_NAME = 'worker-1';
private const BATCH_SIZE = 10;
private const BLOCK_MS = 5000;
private const MAX_RETRIES = 3;
private const DEAD_LETTER = 'magento:order:export:dead';
public function __construct(
private RedisClient $redis,
private \Vendor\Module\Model\ErpExporter $erpExporter,
private \Psr\Log\LoggerInterface $logger
) {}
public function run(): void
{
$this->ensureGroupExists();
while (true) {
// First: check for pending messages (previously unacknowledged)
$this->processPending();
// Then: read new messages
$messages = $this->redis->xreadgroup(
'GROUP', self::GROUP_NAME, self::CONSUMER_NAME,
'COUNT', self::BATCH_SIZE,
'BLOCK', self::BLOCK_MS,
'STREAMS', self::STREAM_KEY, '>'
);
if (empty($messages)) continue;
foreach ($messages[self::STREAM_KEY] as [$messageId, $fields]) {
$this->processMessage($messageId, $fields);
}
}
}
private function processMessage(string $messageId, array $fields): void
{
try {
$payload = $this->deserializePayload($fields);
$this->erpExporter->export($payload['order_id']);
// Acknowledge success
$this->redis->xack(self::STREAM_KEY, self::GROUP_NAME, $messageId);
$this->logger->info('Exported order', ['id' => $payload['order_id']]);
} catch (\Exception $e) {
$this->logger->error('Export failed', [
'message_id' => $messageId,
'error' => $e->getMessage(),
]);
// No XACK = message stays in pending list for retry
}
}
private function processPending(): void
{
$pending = $this->redis->xpending(
self::STREAM_KEY, self::GROUP_NAME, '-', '+', self::BATCH_SIZE
);
foreach ($pending as $entry) {
[$messageId, $consumer, $idleMs, $deliveryCount] = $entry;
if ($deliveryCount >= self::MAX_RETRIES) {
// Move to dead letter stream
$message = $this->redis->xrange(self::STREAM_KEY, $messageId, $messageId);
if (!empty($message)) {
$this->redis->xadd(self::DEAD_LETTER, '*', $message[0][1]);
$this->redis->xack(self::STREAM_KEY, self::GROUP_NAME, $messageId);
$this->logger->error('Message moved to dead letter', ['id' => $messageId]);
}
continue;
}
// Claim and retry
$this->redis->xclaim(
self::STREAM_KEY, self::GROUP_NAME, self::CONSUMER_NAME,
0, $messageId
);
}
}
private function ensureGroupExists(): void
{
try {
$this->redis->xgroup('CREATE', self::STREAM_KEY, self::GROUP_NAME, '0', 'MKSTREAM');
} catch (\Exception $e) {
// Group already exists - that's fine
}
}
private function deserializePayload(array $fields): array
{
$payload = [];
foreach ($fields as $key => $value) {
$decoded = json_decode($value, true);
$payload[$key] = ($decoded !== null) ? $decoded : $value;
}
return $payload;
}
}
Summary
Redis Streams gives you durable message queuing without adding a new service to your stack – if you already run Redis for cache and sessions, Streams costs nothing extra. Consumer groups, acknowledgement, and pending message redelivery cover the core requirements of reliable message processing. The dead letter pattern (move to separate stream after N retries) completes the picture. For shops that do not need RabbitMQ’s routing complexity but want more reliability than Redis Lists, Streams is the right middle ground.
