Fibers weszły do PHP w wersji 8.1 jako niskopoziomowy mechanizm kooperatywnej wielozadaniości. W marcu 2024 pisałem o ReactPHP który buduje na Fibers abstrakcję event loop i async/await. Dziś wchodzę głębiej: jak Fibers działają wewnętrznie, kiedy warto sięgać bezpośrednio do Fiber API bez biblioteki i jak pisać własny prosty scheduler który pozwala równolegle wykonywać zadania w PHP CLI.
Fiber – co to jest i jak działa
Fiber to lekki wątek współpracujący (co-routine) który może być zawieszony i wznowiony przez kod który go uruchomił. W przeciwieństwie do wątków systemowych nie działa równolegle – w danym momencie wykonuje się tylko jeden Fiber. „Równoległość” pochodzi z tego że Fiber sam się zawiesza gdy czeka na I/O, pozwalając innemu uruchomić się.
<?php
declare(strict_types=1);
// Podstawowe API Fiber
$fiber = new Fiber(function(): string {
echo "Fiber: start\n";
// Fiber.suspend() zawiesza fiber i zwraca wartość do kodu który go wznowił
$received = Fiber::suspend('wartość przekazana do resume()');
echo "Fiber: wznowiony, otrzymałem: {$received}\n";
return 'wynik fibera';
});
// Uruchom Fiber - wykonuje się do pierwszego suspend()
$suspended = $fiber->start();
echo "Main: Fiber zawiesił się, przekazał: {$suspended}\n"; // wartość z suspend()
// Wznów Fiber przekazując mu wartość
$result = $fiber->resume('wiadomość dla fibera');
echo "Main: Fiber zakończył się z wynikiem: {$fiber->getReturn()}\n";
// Output:
// Fiber: start
// Main: Fiber zawiesił się, przekazał: wartość przekazana do resume()
// Fiber: wznowiony, otrzymałem: wiadomość dla fibera
// Main: Fiber zakończył się z wynikiem: wynik fibera
Prosty scheduler Fiber – bez bibliotek
<?php
declare(strict_types=1);
// Minimalistyczny scheduler który przeplatwa wiele Fiberów
class FiberScheduler
{
/** @var \SplQueue<Fiber> */
private \SplQueue $queue;
public function __construct()
{
$this->queue = new \SplQueue();
}
// Dodaj nowe zadanie jako Fiber
public function add(callable $task): void
{
$this->queue->enqueue(new Fiber($task));
}
// Uruchom scheduler – wykonuje Fibery po kolei
public function run(): void
{
// Uruchom wszystkie Fibery po raz pierwszy
$pending = new \SplQueue();
foreach ($this->queue as $fiber) {
$fiber->start();
if (!$fiber->isTerminated()) {
$pending->enqueue($fiber);
}
}
// Round-robin – wznawiaj Fibery które są zawieszone
while (!$pending->isEmpty()) {
$fiber = $pending->dequeue();
if ($fiber->isSuspended()) {
$fiber->resume();
}
if (!$fiber->isTerminated()) {
$pending->enqueue($fiber);
}
}
}
}
// Zadanie które symuluje asynchroniczną pracę przez suspend
function fetchDataTask(string $name, int $steps): void
{
echo "[{$name}] Start\n";
for ($i = 1; $i <= $steps; $i++) {
// Symulacja oczekiwania na I/O – zawieszamy Fiber
Fiber::suspend();
echo "[{$name}] Krok {$i}/{$steps}\n";
}
echo "[{$name}] Gotowe\n";
}
$scheduler = new FiberScheduler();
$scheduler->add(fn() => fetchDataTask('Zadanie-A', 3));
$scheduler->add(fn() => fetchDataTask('Zadanie-B', 2));
$scheduler->add(fn() => fetchDataTask('Zadanie-C', 4));
$scheduler->run();
// Output (przeplatane):
// [Zadanie-A] Start
// [Zadanie-B] Start
// [Zadanie-C] Start
// [Zadanie-A] Krok 1/3
// [Zadanie-B] Krok 1/2
// [Zadanie-C] Krok 1/4
// [Zadanie-A] Krok 2/3
// [Zadanie-B] Krok 2/2
// [Zadanie-C] Krok 2/4
// ...
Praktyczne zastosowanie – równoległe pobieranie danych
<?php
declare(strict_types=1);
// Równoległe wykonanie HTTP requestów przez Fibers + stream_select
// Bez ReactPHP, bez Amp – czysty PHP
class ParallelHttpFetcher
{
private array $results = [];
private array $errors = [];
public function fetchAll(array $urls): array
{
$fibers = [];
$handles = [];
// Otwórz wszystkie połączenia HTTP jednocześnie (non-blocking)
foreach ($urls as $key => $url) {
$context = stream_context_create([
'http' => [
'timeout' => 10,
'method' => 'GET',
],
'socket' => [
'tcp_nodelay' => true,
],
]);
$fiber = new Fiber(function() use ($url, $key, $context): void {
// Otwórz non-blocking stream
$stream = fopen($url, 'r', false, $context);
if ($stream === false) {
Fiber::suspend(['error' => "Cannot open: {$url}", 'key' => $key]);
return;
}
stream_set_blocking($stream, false);
$data = '';
// Czytaj w pętli, zawieszając Fiber gdy nie ma danych
while (!feof($stream)) {
$chunk = fread($stream, 8192);
if ($chunk === false || $chunk === '') {
Fiber::suspend(['waiting' => true, 'key' => $key]);
continue;
}
$data .= $chunk;
}
fclose($stream);
Fiber::suspend(['done' => true, 'data' => $data, 'key' => $key]);
});
$fibers[$key] = $fiber;
$fiber->start();
}
// Scheduler – wznawiaj Fibery które czekają
$active = array_fill_keys(array_keys($fibers), true);
while (!empty($active)) {
foreach ($fibers as $key => $fiber) {
if (!isset($active[$key])) continue;
if ($fiber->isSuspended()) {
$status = $fiber->getReturn() ?? [];
if (isset($status['done'])) {
$this->results[$key] = $status['data'];
unset($active[$key]);
} elseif (isset($status['error'])) {
$this->errors[$key] = $status['error'];
unset($active[$key]);
} else {
$fiber->resume(); // czekaj dalej
}
} elseif ($fiber->isTerminated()) {
unset($active[$key]);
}
}
usleep(1000); // 1ms - nie blokuj CPU
}
return ['results' => $this->results, 'errors' => $this->errors];
}
}
Fibers w importerze produktów – kontrolowane równoległe przetwarzanie
<?php
declare(strict_types=1);
// Importer który przetwarza produkty w kontrolowanej równoległości
// Max 5 "jednoczesnych" operacji przez Fibers
class ConcurrentProductImporter
{
private const MAX_CONCURRENT = 5;
public function __construct(
private ProductApiClient $apiClient,
private ProductSaveService $saveService
) {}
public function import(array $skus): array
{
$results = [];
$active = new \SplQueue();
$pending = new \SplQueue();
foreach ($skus as $sku) {
$pending->enqueue($sku);
}
// Uruchom pierwsze MAX_CONCURRENT fiberów
while (!$pending->isEmpty() && $active->count() < self::MAX_CONCURRENT) {
$sku = $pending->dequeue();
$fiber = $this->createImportFiber($sku);
$fiber->start();
$active->enqueue(['sku' => $sku, 'fiber' => $fiber]);
}
// Scheduler - uzupełniaj aktywne gdy któryś się kończy
while (!$active->isEmpty()) {
$item = $active->dequeue();
$fiber = $item['fiber'];
$sku = $item['sku'];
if ($fiber->isSuspended()) {
$fiber->resume();
$active->enqueue($item); // wróć na koniec kolejki
} elseif ($fiber->isTerminated()) {
$results[$sku] = $fiber->getReturn();
// Zastąp zakończony nowym zadaniem
if (!$pending->isEmpty()) {
$nextSku = $pending->dequeue();
$next = $this->createImportFiber($nextSku);
$next->start();
$active->enqueue(['sku' => $nextSku, 'fiber' => $next]);
}
}
}
return $results;
}
private function createImportFiber(string $sku): Fiber
{
return new Fiber(function() use ($sku): array {
// Pobierz dane z API (symulacja I/O)
Fiber::suspend(); // czekamy na API response
$data = $this->apiClient->getProduct($sku);
if ($data === null) {
return ['sku' => $sku, 'status' => 'not_found'];
}
// Zapisz produkt (symulacja I/O do bazy)
Fiber::suspend(); // czekamy na zapis DB
$this->saveService->save($data);
return ['sku' => $sku, 'status' => 'saved'];
});
}
}
Kiedy Fibers, kiedy ReactPHP, kiedy Amp?
| Scenariusz | Wybór | Uzasadnienie |
|---|---|---|
| Kilka prostych równoległych tasków | Bare Fibers | Zero zależności, pełna kontrola |
| Serwer HTTP, WebSockets | ReactPHP | Gotowy event loop, HTTP server |
| Złożone async operacje, Promises | Amp 3.x | Nowoczesne API, dobre wsparcie Fibers |
| Magento CLI importer | Bare Fibers lub ReactPHP | Kontrolowane N-równoległe wywołania API |
| Standardowy request Magento | Żaden | PHP-FPM wystarczy, synchroniczny kod jest OK |
Podsumowanie
Fibers to potężny ale niskopoziomowy mechanizm – coś między wątkami a zwykłymi funkcjami. Bezpośrednie użycie Fiber API ma sens gdy potrzebujesz kontroli której biblioteki nie dają: własny scheduler z limitem równoległości, niestandardowa logika wznowień, zero zależności w prostym CLI skrypcie. Dla złożonych scenariuszy async – ReactPHP lub Amp 3.x budują lepszą abstrakcję na Fibers i warto z nich korzystać.
