POSTD PRODUCED BY NIJIBOX

POSTD PRODUCED BY NIJIBOX

ニジボックスが運営する
エンジニアに向けた
キュレーションメディア

Maxime Fabre

本記事は、原著者の許諾のもとに翻訳・掲載しております。

この記事の前半はこちら: スレッド処理は慎重に – PHPでのスレッド処理 : 前編

オートローディングとコンテキストの継承

これまで挙げてきた例は基本的なことですが、更に高度な例に取り組もうとした場合、ある共通の障害にぶち当たるでしょう。それは、親のコンテキストにオートローディングされたクラスを子スレッドが継承していないということです。

これはなぜかと言うと、子スレッドは親のコンテキストを 必ずしも 継承する必要がないからです。状況によって継承が不都合な場合、例えばいくつかのコンテキストにおいて、ある独立したタスクを行おうとした時や、メモリの上層階で行っている全てを読み込みたくない時などです。

オプションThread インスタンスの ->start() メソッドに渡すことで、親から継承される量を制限できます。

<?php
define('MY_CONSTANT', true);
function test() {}
class Foobar {}

class Example extends Thread
{
    public function run()
    {
        var_dump(defined('MY_CONSTANT'));
        var_dump(function_exists('test'));
        var_dump(class_exists('Foobar'));
    }
}

// true true true
$job = new Example();
$job->start(); // default argument is PTHREADS_INHERIT_ALL
$job->join();

// false false true
$job = new Example();
$job->start(PTHREADS_INHERIT_CLASSES);
$job->join();

// true true true
$job = new Example();
$job->start(PTHREADS_INHERIT_CLASSES | PTHREADS_INHERIT_CONSTANTS | PTHREADS_INHERIT_FUNCTIONS);
$job->join();

ではここで、Composerを使ってあることを要求しましょう。例に対して var_dump の見栄えの良いラッパ、 symfony/var-dumper を要求し、ジョブに対して提供された dump() 関数を使ってみます。

<?php
require 'vendor/autoload.php';

class Example extends Thread
{
    public function run()
    {
        dump('foobar');
    }
}

$job = new Example();
$job->start();
$job->join();

これ行うと実行は失敗し、以下の例外が投げられます。

Class ‘Symfony\Component\VarDumper\VarDumper’ not found

(クラス’Symfony\Component\VarDumper\VarDumper’は見つかりません)

これは、クラスがメインスレッドに読み込まれたにもかかわらず、子スレッドには読み込まれていないからです。いくつかの回避策がありますが、一番簡単なのは require 'vendor/autoload.php' 文をジョブの run に含めることです。しかし、これは非常に悪い方法です。

より良い方法としては、ジョブを実行する前にオートローディングの設定をするためのWorkerクラスを作成することです。考え方は同じですが、この方法であれば、どのジョブクラスにも何も要求する必要がありません。

<?php
class AutoloadingWorker extends Worker
{
    public function run()
    {
        require 'vendor/autoload.php';
    }
}

// Create our worker and stack our job on it
$worker = new AutoloadingWorker();

$job = new Example();
$worker->stack($job);

$worker->start();
$worker->join();

// Or use a pool and specify our custom worker
$pool = new Pool(5, AutoloadingWorker::class);
$pool->submit(new Example());
$pool->shutdown();

これであれば、以下のように正しい結果が与えられます。

$ php autoloading.php                                                                                                                                                                             "foobar"
"foobar"

クロージャの例

これまでクラスに関する問題について取り組んできましたが、コンテキストが完全に保存されないということを認識していないと、クロージャもまた頭を悩ます問題の1つになり得ます。恐らく皆さんは、以下の例は機能するコードだと思われるでしょう。

<?php
class ClosureRunner extends Collectable
{
    public function __construct($closure)
    {
        $this->closure = $closure;
    }

    public function run()
    {
        $closure = $this->closure;
        $closure();

        $this->setGarbage();
    }
}

$foo = 'test';

$pool = new Pool(5, Worker::class);
$pool->submit(new ClosureRunner(function () use ($foo) {
    var_dump($foo);
}));

$pool->shutdown();

しかし、実際は機能しません。クロージャが、変数の参照と共に メインスレッドにコンパイルされており 、そしてジョブに渡すようになっているからです。これだと、参照が既存の変数を指していない子スレッド内でジョブを実行するようになってしまいます。結局のところ、PHPではクロージャのシリアライズ化は困難ということになってしまいます。

そうは言っても、 Threaded::from メソッド(つまりその子スレッド全てで利用可能)を使えば、適切なシリアライゼーションでクロージャからジョブを便利に作成することができます。以下の例で機能するはずです。

<?php
$pool = new Pool(5, Worker::class);

$foo = 'test';
$pool->submit(Collectable::from(function () use ($foo) {
    var_dump($foo);
    $this->setGarbage();
}));

$pool->shutdown();

同期と通知

子スレッドが何をしているかという点については、これまで深く触れてきませんでした。しかし、メインスレッドで起こっていることと、子スレッドで起こっていることの連携が必要になる場合があります。

これには、 waitnotify メソッド、そして isWaiting といった関連するヘルパーを使います。例のとおり、親スレッドに何かしらのタスクを行うと同時に子スレッドに他のいくつかのタスクを行うこととします。親スレッド内のメインタスクを完了させるために子スレッドから結果を得たい場合、以下のようにします。

<?php
class Job extends Thread
{
    public function run()
    {
        while (!$this->isWaiting()) {
            // Do some work here as long
            // as we're not waiting for parent
        }

        $this->synchronized(function () {
            $this->result = 'DONE';
            $this->notify();
        });
    }
}

$job = new Job();
$job->start();

// Do some operation in the main thread here
sleep(1);

// Notify the child thread that we need it
// and get its results
$job->synchronized(function ($job) {
    $job->wait();
    echo $job->result;
}, $job);

全てのコールを $job->synchronized() コール内の waitnotify にラップしている仕方に注意してください。これは、コールと両方の準備が整った後に、全てを同期させるためです。

つまり、メインスレッドで作業を行い、ジョブと同期を行ってから通知を待ちます。その後送信を行います。これは、子スレッドがメインスレッドに改めて統合されないという点で $job->join() を単純に行うのとは 異なり 、それぞれが個々に存在し続けるので、 synchronized コールの後に作業を行うことができます。前出の図のように表すと、notify/waitルーティンは以下のようになります。


このルーティンはあまり役に立たないと最初は思うかもしれませんが、実際はこれで色々なことができます。例のように、子スレッド内で完了され、その後メインスレッドに報告するクロージャを待っている Promise クラスを作成することによって、このルーティンを通じて、 非常に 簡素化されたPromiseと同等の物を作ることができます。

<?php
class Promise extends Thread
{
    public function __construct(Closure $closure)
    {
        $this->closure = $closure;
        $this->start();
    }

    public function run()
    {
        $this->synchronized(function () {
            $closure = $this->closure;

            $this->result = $closure();
            $this->notify();
        });
    }

    public function then(callable $callback)
    {
        return $this->synchronized(function () use ($callback) {
            if (!$this->result) {
                $this->wait();
            }

            $callback($this->result);
        });
    }
}

$promise = new Promise(function () {
    return file_get_contents('http://google.fr');
});

$promise->then(function ($results) {
   echo $results;
});

これによって、いくつかの本質的なことが明らかになっています。待機と通知を異なるスレッドに入れる必要がない、それ自身または別の子スレッドを待つようにすることができる、メインスレッドを全ての子スレッドに通知することができる、などです。

おまけ:コマンドバスをスレッド処理

これまでに見てきた例は、理解度を高めるための基本的なものでしたが、スレッド処理をさせたいタスクの理想としては、上記の例よりも大規模だったり複雑な依存関係を持っていたりするものではないでしょうか。

通常、PHPの現代的なプロジェクトは、コマンドバスによってサポートされます(Laravel、Tactician、その他、好みに応じて)。スレッド処理は、コマンドバスのインスタンスを受け取り、そのコマンドを実行するWorkerのカスタムクラスを通じて、このコンテキストに簡単に統合可能です。以下をご覧ください。

<?php
use League\Tactician\CommandBus;

class CommandBusWorker extends Worker
{
    public function __construct(CommandBus $bus)
    {
        $this->bus = $bus;
    }

    public function run()
    {
        require 'vendor/autoload.php';
    }

    public function handle($command)
    {
        return $this->bus->handle($command);
    }
}

class CommandJob extends Collectable
{
    public function __construct($command)
    {
        $this->command = $command;
    }

    public function run()
    {
        $this->worker->handle($this->command);
    }
}

$pool = new Pool(5, CommandBusWorker::class, [new CommandBus()]);

$pool->submit(new CommandJob(new Command($some, $arguments)));
$pool->submit(new CommandJob(new AnotherCommand($some, $arguments)));

どうですか? これで、コマンドバスのメリットを残したまま、2つのコマンドラインはスレッド化され並列に実行されるようになるはずです。常用文の全てを除外する特別なプールを作ることもできます。

<?php
class CommandBusPool extends Pool
{
    public function __construct(CommandBus $bus)
    {
        parent::__construct(5, CommandBusWorker::class, [$bus]);
    }

    public function submit($command)
    {
        return parent::submit(new CommandJob($command));        
    }
}

$pool = new CommandBusPool(new CommandBus());
$pool->submit(new Command($some, $argument));

ただし、気を付けたいことが1つあります。ここで行っているように、依存関係がワーカーに渡される場合は、それが Threaded (またはスカラ)の子でなければ、 必ずシリアライズ可能にしてください。 つまり、それらはクロージャを含むプロパティを持ってはならないということです。もし持ってしまった場合、(例のように、コンテナおよびサービスプロバイダを介して)子スレッドにインスタンスを再作成するのが簡単な方法ですが、子スレッド内でコマンドバスのインスタンスを解決する任意の方法を用いても構いません。

リソース、およびその他の落とし穴について

リソース

コマンドバスの例では、気を付けるべき点に焦点を当てています。それは、リソースはメインスレッドから子スレッドに渡すことが できない ということです。理由はPHPの内部動作に制限があるためで、エッジケースにおいては動き続けますが、いずれにしてもスレッド間にリソースを渡すのは避けた方が無難でしょう。

ここで言うリソースとは、ソケットやデータベース接続、それにストリームなどのことです。もし子スレッドにデータベースが必要であれば、それ自体のコンテキストでデータベースに接続するワーカーを作成しましょう。ソケットについても同様です。メインスレッド内ではなく、子スレッド内でソケットに接続します。

なお、これを可能にする方法は あります が、そもそも頭痛の種を増やすようなことは最初からしない方がいいと思います。可能な限り子スレッドを別のコンテキストとして考えれば、いい結果になるはずです。

データの破損

もう1つの落とし穴(内容を簡潔にするために、この記事の例では無視していました)はデータの破損です。例えば、ジョブがアクセスする配列をワーカーが保持しているとしましょう。この時、2つのジョブが同時に配列を変更した場合、どれだけ万全を期しても配列は じきに 破損してしまいます。これはpthreads自体の制限ではなく、2つのスレッドが同じ値を同時に変更すればトラブルが発生するという、スレッド処理全般の問題です。これを回避するため、pthreadsには Stackable クラスがあり、それを使えばスレッドセーフのデータ同期が行えるよう拡張することができます。以下のような形です。

<?php

class Results extends Stackable
{
}

class Job extends Thread
{
    public function __construct($test)
    {
        $this->test = $test;
        $this->start();
    }

    public function run()
    {
        $this->test[] = rand(0, 10);
    }
}

$results = new Results();

$jobs = [];
while (@$i++ < 100) {
    $jobs[] = new Job($results);
}

foreach ($jobs as $job) {
    $job->join();
}

// Should print 100
var_dump(count($results));

これを実行した場合、配列のカウントとして予想通りに 100 が得られるはずです。しかし、これを、例えば $results = [] のような標準的な配列で実行するとどうでしょうか。得られるのは 毎回 0 ですね。 Threaded を継承していない全ての構造体はシリアル化されるので、元の変数を参照しなくなります。そのため、子スレッドでは値は正しいままですが、メインスレッドは正しくなりません。

スレッド間でデータを移動する必要がある場合は、できるだけ( Stackable のように) Threaded の子に渡すよう心がけてください。そうすればシリアル化されることはありませんし、データ破損やメモリトラブルを気にすることなくスレッド間の移動に特化した形にすることができます。

さあ、始めましょう

PHPでのpthreadsを使ったスレッド処理を掘り下げて見てきましたが、いかがだったでしょうか。今後、同時にしなければならないことがたくさんあるような時は、PHPを完全に選択肢から排除する前に、よく考えてみてもらえればと思います。スレッド処理は作業を別個のユニットとして分割できるような特定の状況下で有効なもので(したがって、コマンドバスはとてもよく適合します)、 常に 解決策になるというようなものではありません。それでも頼りになるツールですし、覚えていて損はないはずです。

pthreadsの拡張機能は 非常に出来が良く 、管理も十分にされており、PHP7ともすでに互換性があります。つい最近、新バージョンが出たばかりです(この記事の例で使用したバージョンは2.0.10ですが、PHP7用の最新バージョンは3.0.8です)。

さらにうれしいことにGithubから 拡張機能のリポジトリが入手可能で、豊富なサンプルやドキュメンテーションに加え、内部構造についての説明も閲覧することができます。その他、PHP全般のスレッド処理とpthreadsに関する根拠を作者(krakjoe)が説明した一連の 要項 は、私のお薦めです。

なお、記事に掲載した例については、必要に応じて こちらのレポジトリから 全てを入手することができます。

さあ、これで終わりです。スレッド処理は慎重に!

リンクのまとめ