PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

ReactPHP – event loop, równoległe HTTP requests, serwer HTTP, Fibers bridge

by Henryk Tews / wtorek, 05 marca 2024 / Opublikowano w PHP

PHP jest synchroniczny z natury – jeden request, jeden wątek, czekaj na bazę, czekaj na API, czekaj na plik. Ale od PHP 8.1 mamy Fibers, a ReactPHP istnieje od lat i daje pełną asynchroniczność opartą na event loop. Pokazuję kiedy to ma realne zastosowanie, jak działa event loop w PHP i jak napisać prosty serwer HTTP oraz klienta który odpytuje kilka API równolegle.

Kiedy asynchroniczny PHP ma sens?

Większość aplikacji webowych PHP nie potrzebuje asynchroniczności – wąskie gardło to baza danych, a nie event loop. Ale są konkretne przypadki gdzie async PHP realnie pomaga:

  • Import danych z wielu zewnętrznych API równolegle (zamiast sekwencyjnie)
  • Długo działające procesy CLI – np. przetwarzanie tysięcy zamówień z limitowanym równoległością
  • Websockets i długie połączenia HTTP (polling, Server-Sent Events)
  • Mikroserwis który sam jest serwerem HTTP bez narzutu PHP-FPM + nginx

Instalacja ReactPHP

composer require react/event-loop
composer require react/http
composer require react/promise
composer require react/async   # PHP 8.1+ Fibers bridge

Event Loop – serce asynchroniczności

<?php

declare(strict_types=1);

require 'vendor/autoload.php';

use React\EventLoop\Loop;

// Event loop - nieskończona pętla która reaguje na zdarzenia
$loop = Loop::get();

// Timer - odpowiednik sleep() ale nieblokujący
$loop->addTimer(1.0, function() {
    echo "1 sekunda minęła\n";
});

$loop->addTimer(3.0, function() {
    echo "3 sekundy minęły\n";
});

// Periodic timer - jak cron, ale w procesie
$counter = 0;
$timer = $loop->addPeriodicTimer(0.5, function() use (&$counter, &$timer) {
    $counter++;
    echo "Tick #{$counter}\n";

    if ($counter >= 5) {
        Loop::cancelTimer($timer); // zatrzymaj po 5 tickach
    }
});

echo "Loop startuje...\n";
// Bez run() - ReactPHP 4.x uruchamia loop automatycznie
// $loop->run(); // w ReactPHP 3.x wymagane

Równoległe HTTP requests – killer feature

Sekwencyjne odpytanie 5 API po 500ms każde = 2.5 sekundy. Równoległe = 500ms. To realna różnica przy imporcie danych:

<?php

declare(strict_types=1);

require 'vendor/autoload.php';

use React\Http\Browser;
use React\Promise\Promise;
use function React\Async\await;
use function React\Promise\all;

$browser = new Browser();

// Sekwencyjne - jak klasyczny PHP (wolne)
function fetchSequential(Browser $browser): void
{
    $start = microtime(true);

    $r1 = await($browser->get('https://jsonplaceholder.typicode.com/posts/1'));
    $r2 = await($browser->get('https://jsonplaceholder.typicode.com/posts/2'));
    $r3 = await($browser->get('https://jsonplaceholder.typicode.com/posts/3'));

    $elapsed = round((microtime(true) - $start) * 1000);
    echo "Sekwencyjne: {$elapsed}ms\n"; // ~1500ms (3x ~500ms)
}

// Równoległe - ReactPHP async (szybkie)
function fetchParallel(Browser $browser): void
{
    $start = microtime(true);

    // Uruchom wszystkie requesty jednocześnie
    $promises = [
        $browser->get('https://jsonplaceholder.typicode.com/posts/1'),
        $browser->get('https://jsonplaceholder.typicode.com/posts/2'),
        $browser->get('https://jsonplaceholder.typicode.com/posts/3'),
    ];

    // Poczekaj aż wszystkie się zakończą
    [$r1, $r2, $r3] = await(all($promises));

    $elapsed = round((microtime(true) - $start) * 1000);
    echo "Równoległe: {$elapsed}ms\n"; // ~500ms (wszystkie jednocześnie)

    foreach ([$r1, $r2, $r3] as $response) {
        $data = json_decode((string) $response->getBody(), true);
        echo "Post ID: {$data['id']} - {$data['title']}\n";
    }
}

fetchParallel($browser);

Praktyczny przykład – import produktów z kilku źródeł

<?php

declare(strict_types=1);

use React\Http\Browser;
use function React\Async\await;
use function React\Promise\all;

class ParallelProductImporter
{
    private Browser $browser;

    public function __construct()
    {
        $this->browser = new Browser();
    }

    public function importFromSources(array $sourceUrls): array
    {
        $start = microtime(true);

        // Uruchom wszystkie requesty do źródeł jednocześnie
        $promises = array_map(
            fn(string $url) => $this->fetchSource($url),
            $sourceUrls
        );

        // Poczekaj na wszystkie odpowiedzi
        $results = await(all($promises));

        $elapsed = round((microtime(true) - $start) * 1000);
        echo "Pobrano " . count($sourceUrls) . " źródeł w {$elapsed}ms\n";

        // Scal produkty ze wszystkich źródeł
        return array_merge(...$results);
    }

    private function fetchSource(string $url): \React\Promise\PromiseInterface
    {
        return $this->browser
            ->get($url, ['User-Agent' => 'ProductImporter/1.0'])
            ->then(function (\Psr\Http\Message\ResponseInterface $response) use ($url) {
                if ($response->getStatusCode() !== 200) {
                    echo "Błąd dla {$url}: " . $response->getStatusCode() . "\n";
                    return [];
                }

                $data = json_decode((string) $response->getBody(), true, 512, JSON_THROW_ON_ERROR);
                return $this->normalizeProducts($data);
            })
            ->catch(function (\Exception $e) use ($url) {
                echo "Wyjątek dla {$url}: " . $e->getMessage() . "\n";
                return []; // nie zatrzymuj całego importu przy błędzie jednego źródła
            });
    }

    private function normalizeProducts(array $data): array
    {
        return array_map(fn($item) => [
            'sku'   => $item['id'] ?? $item['sku'] ?? uniqid('SKU-'),
            'name'  => $item['title'] ?? $item['name'] ?? 'Unnamed',
            'price' => (float) ($item['price'] ?? 0.0),
        ], $data);
    }
}

// Użycie
$importer = new ParallelProductImporter();

// 5 źródeł odpytanych równolegle zamiast sekwencyjnie
$products = $importer->importFromSources([
    'https://api.supplier1.com/products',
    'https://api.supplier2.com/catalog',
    'https://api.supplier3.com/items',
    'https://api.warehouse.com/stock',
    'https://api.pim.example.com/export',
]);

echo "Zaimportowano: " . count($products) . " produktów\n";

Prosty serwer HTTP z ReactPHP

<?php

declare(strict_types=1);

use React\Http\HttpServer;
use React\Http\Message\Response;
use React\Socket\SocketServer;
use Psr\Http\Message\ServerRequestInterface;

// Prosty API serwer - bez nginx, bez PHP-FPM
$server = new HttpServer(function (ServerRequestInterface $request) {
    $path   = $request->getUri()->getPath();
    $method = $request->getMethod();

    // Routing
    if ($method === 'GET' && $path === '/health') {
        return Response::json(['status' => 'ok', 'time' => date('c')]);
    }

    if ($method === 'GET' && preg_match('/^\/products\/(\d+)$/', $path, $matches)) {
        $productId = (int) $matches[1];

        // Tutaj możesz wywołać bazę danych przez async driver
        return Response::json([
            'id'    => $productId,
            'name'  => "Produkt #{$productId}",
            'price' => 29.99,
        ]);
    }

    if ($method === 'POST' && $path === '/products') {
        $body = json_decode((string) $request->getBody(), true);
        // Zapis do bazy...
        return new Response(201, ['Content-Type' => 'application/json'],
            json_encode(['id' => rand(1000, 9999), ...$body])
        );
    }

    return new Response(404, ['Content-Type' => 'application/json'],
        json_encode(['error' => 'Not Found'])
    );
});

$socket = new SocketServer('0.0.0.0:8080');
$server->listen($socket);

echo "Serwer nasłuchuje na http://0.0.0.0:8080\n";

Fibers PHP 8.1 + ReactPHP – await zamiast then/catch

<?php

declare(strict_types=1);

use function React\Async\await;
use function React\Async\async;

// Dzięki Fibers możesz pisać async kod jak synchroniczny
// Zamiast Promise::then()->then()->catch() - normalny kod PHP

$fetchAndProcess = async(function() use ($browser): array {
    // await "zawiesza" Fiber do czasu odpowiedzi - bez blokowania event loop
    $response = await($browser->get('https://api.example.com/products'));
    $products = json_decode((string) $response->getBody(), true);

    // Kolejny await - event loop może robić inne rzeczy w między czasie
    $enriched = await(enrichProducts($products));

    return $enriched;
});

$result = await($fetchAndProcess());

Podsumowanie

ReactPHP to potężne narzędzie dla specyficznych zastosowań. Nie zastępuje klasycznego PHP-FPM dla aplikacji webowych – tam korzyści są minimalne. Świeci przy równoległym odpytywaniu wielu API, długo działających procesach CLI i serwerach WebSocket. Fibers z PHP 8.1 sprawiają że async kod wygląda jak synchroniczny – to ogromna poprawa czytelności względem Promise::then() chainingu.

About Henryk Tews

Co możesz przeczytać następne

Komponenty Symfony bez frameworka – Console, Validator, HttpClient
Blackfire – instalacja w DDEV, profilowanie HTTP i CLI, asercje w CI/CD
PHP 8.4 preview – property hooks, asymmetric visibility, chaining new
  • Publikacje
  • O autorze
  • Kontakt

© 2026 Created by

GÓRA
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Zawsze aktywne
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Zarządzaj opcjami
  • Zarządzaj serwisami
  • Zarządzaj {vendor_count} dostawcami
  • Przeczytaj więcej o tych celach
Zobacz preferencje
  • {title}
  • {title}
  • {title}