PHP / Magento Dev Blog

  • Publikacje
  • O autorze
  • Kontakt

PHP Fibers deep dive – własny scheduler, kooperatywna wielozadaniowość, równoległe HTTP

by Henryk Tews / wtorek, 15 lipca 2025 / Opublikowano w PHP

Fibers weszły do PHP w wersji 8.1 jako niskopoziomowy mechanizm kooperatywnej wielozadaniości. W marcu 2024 pisałem o ReactPHP który buduje na Fibers abstrakcję event loop i async/await. Dziś wchodzę głębiej: jak Fibers działają wewnętrznie, kiedy warto sięgać bezpośrednio do Fiber API bez biblioteki i jak pisać własny prosty scheduler który pozwala równolegle wykonywać zadania w PHP CLI.

Fiber – co to jest i jak działa

Fiber to lekki wątek współpracujący (co-routine) który może być zawieszony i wznowiony przez kod który go uruchomił. W przeciwieństwie do wątków systemowych nie działa równolegle – w danym momencie wykonuje się tylko jeden Fiber. „Równoległość” pochodzi z tego że Fiber sam się zawiesza gdy czeka na I/O, pozwalając innemu uruchomić się.

<?php

declare(strict_types=1);

// Podstawowe API Fiber
$fiber = new Fiber(function(): string {
    echo "Fiber: start\n";

    // Fiber.suspend() zawiesza fiber i zwraca wartość do kodu który go wznowił
    $received = Fiber::suspend('wartość przekazana do resume()');
    echo "Fiber: wznowiony, otrzymałem: {$received}\n";

    return 'wynik fibera';
});

// Uruchom Fiber - wykonuje się do pierwszego suspend()
$suspended = $fiber->start();
echo "Main: Fiber zawiesił się, przekazał: {$suspended}\n"; // wartość z suspend()

// Wznów Fiber przekazując mu wartość
$result = $fiber->resume('wiadomość dla fibera');
echo "Main: Fiber zakończył się z wynikiem: {$fiber->getReturn()}\n";

// Output:
// Fiber: start
// Main: Fiber zawiesił się, przekazał: wartość przekazana do resume()
// Fiber: wznowiony, otrzymałem: wiadomość dla fibera
// Main: Fiber zakończył się z wynikiem: wynik fibera

Prosty scheduler Fiber – bez bibliotek

<?php

declare(strict_types=1);

// Minimalistyczny scheduler który przeplatwa wiele Fiberów
class FiberScheduler
{
    /** @var \SplQueue<Fiber> */
    private \SplQueue $queue;

    public function __construct()
    {
        $this->queue = new \SplQueue();
    }

    // Dodaj nowe zadanie jako Fiber
    public function add(callable $task): void
    {
        $this->queue->enqueue(new Fiber($task));
    }

    // Uruchom scheduler – wykonuje Fibery po kolei
    public function run(): void
    {
        // Uruchom wszystkie Fibery po raz pierwszy
        $pending = new \SplQueue();

        foreach ($this->queue as $fiber) {
            $fiber->start();
            if (!$fiber->isTerminated()) {
                $pending->enqueue($fiber);
            }
        }

        // Round-robin – wznawiaj Fibery które są zawieszone
        while (!$pending->isEmpty()) {
            $fiber = $pending->dequeue();

            if ($fiber->isSuspended()) {
                $fiber->resume();
            }

            if (!$fiber->isTerminated()) {
                $pending->enqueue($fiber);
            }
        }
    }
}

// Zadanie które symuluje asynchroniczną pracę przez suspend
function fetchDataTask(string $name, int $steps): void
{
    echo "[{$name}] Start\n";

    for ($i = 1; $i <= $steps; $i++) {
        // Symulacja oczekiwania na I/O – zawieszamy Fiber
        Fiber::suspend();
        echo "[{$name}] Krok {$i}/{$steps}\n";
    }

    echo "[{$name}] Gotowe\n";
}

$scheduler = new FiberScheduler();
$scheduler->add(fn() => fetchDataTask('Zadanie-A', 3));
$scheduler->add(fn() => fetchDataTask('Zadanie-B', 2));
$scheduler->add(fn() => fetchDataTask('Zadanie-C', 4));

$scheduler->run();

// Output (przeplatane):
// [Zadanie-A] Start
// [Zadanie-B] Start
// [Zadanie-C] Start
// [Zadanie-A] Krok 1/3
// [Zadanie-B] Krok 1/2
// [Zadanie-C] Krok 1/4
// [Zadanie-A] Krok 2/3
// [Zadanie-B] Krok 2/2
// [Zadanie-C] Krok 2/4
// ...

Praktyczne zastosowanie – równoległe pobieranie danych

<?php

declare(strict_types=1);

// Równoległe wykonanie HTTP requestów przez Fibers + stream_select
// Bez ReactPHP, bez Amp – czysty PHP

class ParallelHttpFetcher
{
    private array $results = [];
    private array $errors  = [];

    public function fetchAll(array $urls): array
    {
        $fibers  = [];
        $handles = [];

        // Otwórz wszystkie połączenia HTTP jednocześnie (non-blocking)
        foreach ($urls as $key => $url) {
            $context = stream_context_create([
                'http' => [
                    'timeout' => 10,
                    'method'  => 'GET',
                ],
                'socket' => [
                    'tcp_nodelay' => true,
                ],
            ]);

            $fiber = new Fiber(function() use ($url, $key, $context): void {
                // Otwórz non-blocking stream
                $stream = fopen($url, 'r', false, $context);

                if ($stream === false) {
                    Fiber::suspend(['error' => "Cannot open: {$url}", 'key' => $key]);
                    return;
                }

                stream_set_blocking($stream, false);

                $data = '';

                // Czytaj w pętli, zawieszając Fiber gdy nie ma danych
                while (!feof($stream)) {
                    $chunk = fread($stream, 8192);
                    if ($chunk === false || $chunk === '') {
                        Fiber::suspend(['waiting' => true, 'key' => $key]);
                        continue;
                    }
                    $data .= $chunk;
                }

                fclose($stream);
                Fiber::suspend(['done' => true, 'data' => $data, 'key' => $key]);
            });

            $fibers[$key] = $fiber;
            $fiber->start();
        }

        // Scheduler – wznawiaj Fibery które czekają
        $active = array_fill_keys(array_keys($fibers), true);

        while (!empty($active)) {
            foreach ($fibers as $key => $fiber) {
                if (!isset($active[$key])) continue;

                if ($fiber->isSuspended()) {
                    $status = $fiber->getReturn() ?? [];

                    if (isset($status['done'])) {
                        $this->results[$key] = $status['data'];
                        unset($active[$key]);
                    } elseif (isset($status['error'])) {
                        $this->errors[$key] = $status['error'];
                        unset($active[$key]);
                    } else {
                        $fiber->resume(); // czekaj dalej
                    }
                } elseif ($fiber->isTerminated()) {
                    unset($active[$key]);
                }
            }

            usleep(1000); // 1ms - nie blokuj CPU
        }

        return ['results' => $this->results, 'errors' => $this->errors];
    }
}

Fibers w importerze produktów – kontrolowane równoległe przetwarzanie

<?php

declare(strict_types=1);

// Importer który przetwarza produkty w kontrolowanej równoległości
// Max 5 "jednoczesnych" operacji przez Fibers
class ConcurrentProductImporter
{
    private const MAX_CONCURRENT = 5;

    public function __construct(
        private ProductApiClient $apiClient,
        private ProductSaveService $saveService
    ) {}

    public function import(array $skus): array
    {
        $results = [];
        $active  = new \SplQueue();
        $pending = new \SplQueue();

        foreach ($skus as $sku) {
            $pending->enqueue($sku);
        }

        // Uruchom pierwsze MAX_CONCURRENT fiberów
        while (!$pending->isEmpty() && $active->count() < self::MAX_CONCURRENT) {
            $sku   = $pending->dequeue();
            $fiber = $this->createImportFiber($sku);
            $fiber->start();
            $active->enqueue(['sku' => $sku, 'fiber' => $fiber]);
        }

        // Scheduler - uzupełniaj aktywne gdy któryś się kończy
        while (!$active->isEmpty()) {
            $item  = $active->dequeue();
            $fiber = $item['fiber'];
            $sku   = $item['sku'];

            if ($fiber->isSuspended()) {
                $fiber->resume();
                $active->enqueue($item); // wróć na koniec kolejki
            } elseif ($fiber->isTerminated()) {
                $results[$sku] = $fiber->getReturn();

                // Zastąp zakończony nowym zadaniem
                if (!$pending->isEmpty()) {
                    $nextSku = $pending->dequeue();
                    $next    = $this->createImportFiber($nextSku);
                    $next->start();
                    $active->enqueue(['sku' => $nextSku, 'fiber' => $next]);
                }
            }
        }

        return $results;
    }

    private function createImportFiber(string $sku): Fiber
    {
        return new Fiber(function() use ($sku): array {
            // Pobierz dane z API (symulacja I/O)
            Fiber::suspend(); // czekamy na API response
            $data = $this->apiClient->getProduct($sku);

            if ($data === null) {
                return ['sku' => $sku, 'status' => 'not_found'];
            }

            // Zapisz produkt (symulacja I/O do bazy)
            Fiber::suspend(); // czekamy na zapis DB
            $this->saveService->save($data);

            return ['sku' => $sku, 'status' => 'saved'];
        });
    }
}

Kiedy Fibers, kiedy ReactPHP, kiedy Amp?

Scenariusz Wybór Uzasadnienie
Kilka prostych równoległych tasków Bare Fibers Zero zależności, pełna kontrola
Serwer HTTP, WebSockets ReactPHP Gotowy event loop, HTTP server
Złożone async operacje, Promises Amp 3.x Nowoczesne API, dobre wsparcie Fibers
Magento CLI importer Bare Fibers lub ReactPHP Kontrolowane N-równoległe wywołania API
Standardowy request Magento Żaden PHP-FPM wystarczy, synchroniczny kod jest OK

Podsumowanie

Fibers to potężny ale niskopoziomowy mechanizm – coś między wątkami a zwykłymi funkcjami. Bezpośrednie użycie Fiber API ma sens gdy potrzebujesz kontroli której biblioteki nie dają: własny scheduler z limitem równoległości, niestandardowa logika wznowień, zero zależności w prostym CLI skrypcie. Dla złożonych scenariuszy async – ReactPHP lub Amp 3.x budują lepszą abstrakcję na Fibers i warto z nich korzystać.

About Henryk Tews

Co możesz przeczytać następne

Trendy 2025 – FrankenPHP, AI tooling update, OpenSearch, Hyvä konsolidacja
Blackfire – instalacja w DDEV, profilowanie HTTP i CLI, asercje w CI/CD
PHP 9.0 pierwsze RFC – co zostanie usunięte, typed arrays dyskusja, timeline
  • Publikacje
  • O autorze
  • Kontakt

© 2026 Created by

GÓRA
Zarządzaj zgodą
Aby zapewnić jak najlepsze wrażenia, korzystamy z technologii, takich jak pliki cookie, do przechowywania i/lub uzyskiwania dostępu do informacji o urządzeniu. Zgoda na te technologie pozwoli nam przetwarzać dane, takie jak zachowanie podczas przeglądania lub unikalne identyfikatory na tej stronie. Brak wyrażenia zgody lub wycofanie zgody może niekorzystnie wpłynąć na niektóre cechy i funkcje.
Funkcjonalne Zawsze aktywne
Przechowywanie lub dostęp do danych technicznych jest ściśle konieczny do uzasadnionego celu umożliwienia korzystania z konkretnej usługi wyraźnie żądanej przez subskrybenta lub użytkownika, lub wyłącznie w celu przeprowadzenia transmisji komunikatu przez sieć łączności elektronicznej.
Preferencje
Przechowywanie lub dostęp techniczny jest niezbędny do uzasadnionego celu przechowywania preferencji, o które nie prosi subskrybent lub użytkownik.
Statystyka
Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do celów statystycznych. Przechowywanie techniczne lub dostęp, który jest używany wyłącznie do anonimowych celów statystycznych. Bez wezwania do sądu, dobrowolnego podporządkowania się dostawcy usług internetowych lub dodatkowych zapisów od strony trzeciej, informacje przechowywane lub pobierane wyłącznie w tym celu zwykle nie mogą być wykorzystywane do identyfikacji użytkownika.
Marketing
Przechowywanie lub dostęp techniczny jest wymagany do tworzenia profili użytkowników w celu wysyłania reklam lub śledzenia użytkownika na stronie internetowej lub na kilku stronach internetowych w podobnych celach marketingowych.
  • Zarządzaj opcjami
  • Zarządzaj serwisami
  • Zarządzaj {vendor_count} dostawcami
  • Przeczytaj więcej o tych celach
Zobacz preferencje
  • {title}
  • {title}
  • {title}