PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

Message Queue Framework z RabbitMQ – publisher, consumer, DDEV config

by Henryk Tews / wtorek, 12 listopada 2019 / Opublikowano w Magento 2

Operacje takie jak wysyłka emaili, synchronizacja z ERP czy przeliczanie indeksów nie muszą blokować requestu HTTP. Magento 2 ma wbudowany Message Queue Framework, który pozwala przerzucać takie zadania do asynchronicznego kolejkowania. Pokazuję jak to działa z RabbitMQ i jak napisać własnego producenta i konsumenta.

Czym jest Message Queue Framework w Magento 2?

MQF to warstwa abstrakcji nad brokerami wiadomości. Magento obsługuje dwa backendy:

  • MySQL – domyślny, zero konfiguracji, wystarczający dla mniejszych wdrożeń
  • RabbitMQ – zalecany dla środowisk produkcyjnych, obsługuje routing, retry, dead letter queues

Architektura opiera się na trzech elementach: Publisher (wysyła wiadomość), Queue (przechowuje), Consumer (przetwarza).

Konfiguracja modułu – pliki XML

Własny moduł z kolejką wymaga kilku plików konfiguracyjnych. Zacznijmy od definicji tematu wiadomości w etc/communication.xml:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">

    <topic name="vendor.module.order.export"
           request="Vendor\Module\Api\Data\OrderExportRequestInterface"/>

</config>

Następnie konfiguracja kolejki i bindingów w etc/queue_topology.xml:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">

    <exchange name="magento" type="topic" connection="amqp">
        <binding id="orderExportBinding"
                 topic="vendor.module.order.export"
                 destinationType="queue"
                 destination="vendor.order.export"/>
    </exchange>

</config>

Rejestracja konsumenta w etc/queue_consumer.xml:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">

    <consumer name="vendor.order.export.consumer"
              queue="vendor.order.export"
              connection="amqp"
              handler="Vendor\Module\Model\Consumer\OrderExportConsumer::process"
              maxMessages="100"/>

</config>

Publisher – wysyłanie wiadomości do kolejki

<?php

namespace Vendor\Module\Model;

use Magento\Framework\MessageQueue\PublisherInterface;
use Vendor\Module\Api\Data\OrderExportRequestInterface;
use Vendor\Module\Api\Data\OrderExportRequestInterfaceFactory;

class OrderExportPublisher
{
    private const TOPIC = 'vendor.module.order.export';

    public function __construct(
        private PublisherInterface $publisher,
        private OrderExportRequestInterfaceFactory $requestFactory
    ) {}

    public function publish(int $orderId): void
    {
        /** @var OrderExportRequestInterface $request */
        $request = $this->requestFactory->create();
        $request->setOrderId($orderId);

        // Wiadomość ląduje w kolejce - request HTTP kończy się natychmiast
        $this->publisher->publish(self::TOPIC, $request);
    }
}

Consumer – przetwarzanie wiadomości

<?php

namespace Vendor\Module\Model\Consumer;

use Vendor\Module\Api\Data\OrderExportRequestInterface;
use Psr\Log\LoggerInterface;

class OrderExportConsumer
{
    public function __construct(
        private LoggerInterface $logger,
        private \Vendor\Module\Model\ErpExporter $erpExporter
    ) {}

    public function process(OrderExportRequestInterface $request): void
    {
        $orderId = $request->getOrderId();

        try {
            $this->erpExporter->export($orderId);
            $this->logger->info('Order exported successfully', ['order_id' => $orderId]);
        } catch (\Exception $e) {
            // Wyjątek = wiadomość trafia do dead letter queue (przy RabbitMQ)
            $this->logger->error('Export failed', [
                'order_id' => $orderId,
                'error'    => $e->getMessage(),
            ]);

            throw $e; // przekaż dalej - Magento obsłuży retry
        }
    }
}

Uruchamianie konsumenta

# Uruchom konsumenta ręcznie (dla testów)
bin/magento queue:consumers:start vendor.order.export.consumer

# Z limitem wiadomości
bin/magento queue:consumers:start vendor.order.export.consumer --max-messages=500

# Na produkcji - przez supervisor lub systemd
# /etc/supervisor/conf.d/magento-consumer.conf:
# [program:magento_order_export]
# command=/var/www/magento/bin/magento queue:consumers:start vendor.order.export.consumer
# autostart=true
# autorestart=true
# user=www-data

RabbitMQ w DDEV

Jeśli pracujesz lokalnie na DDEV, RabbitMQ możesz dodać przez własny docker-compose:

# .ddev/docker-compose.rabbitmq.yaml
version: '3.6'
services:
  rabbitmq:
    image: rabbitmq:3.8-management
    environment:
      RABBITMQ_DEFAULT_USER: magento
      RABBITMQ_DEFAULT_PASS: magento
    ports:
      - "5672"
      - "15672:15672" # panel zarządzania
    labels:
      com.ddev.site-name: ${DDEV_SITENAME}

Panel zarządzania RabbitMQ będzie dostępny pod http://localhost:15672 – możesz tam podglądać kolejki, liczbę wiadomości i ewentualne dead letter messages.

Podsumowanie

Message Queue Framework w Magento 2 pozwala przenosić czasochłonne operacje poza cykl request-response. Konfiguracja przez XML wydaje się rozbudowana, ale po napisaniu pierwszego modułu schemat jest powtarzalny. RabbitMQ daje pełnię możliwości – retry, dead letter queues, routing – i jest właściwym wyborem dla środowisk produkcyjnych obsługujących duże wolumeny zamówień.

About Henryk Tews

Co możesz przeczytać następne

B2B – Company, Shared Catalog, Negotiable Quote, Requisition List, własne pluginy
Jubileusz 100 wpisów – retrospektywa 6,5 roku, PHP 7.2->8.4, Magento 2.2->2.4.8
Redis Streams – Consumer Groups, pending messages, dead letter, integracja z Magento queue
  • Publikacje
  • O autorze
  • Kontakt

© 2026 Created by

GÓRA
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Zawsze aktywne
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Zarządzaj opcjami
  • Zarządzaj serwisami
  • Zarządzaj {vendor_count} dostawcami
  • Przeczytaj więcej o tych celach
Zobacz preferencje
  • {title}
  • {title}
  • {title}