本文介绍了在PHP7 pthreads扩展中使用Pool类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我参加了使用Pool类的pthreads PHP7扩展的最基本演示(此演示 https: //github.com/krakjoe/pthreads#polyfill ),并将其扩展一点,这样我就可以从线程中获取结果了(或者至少我认为可以):

I took the most basic demo of pthreads PHP7 extension that uses Pool class (this demo https://github.com/krakjoe/pthreads#polyfill) and extended it a little so I can grab results from the thread (or at least I think I can):

$pool = new Pool(4);

foreach (range(1, 8) as $i) {
    $pool->submit(new class($i) extends Threaded
    {
        public $i;
        private $garbage = false;

        public function __construct($i)
        {
            $this->i = $i;
        }

        public function run()
        {
            echo "Hello World\n";
            $this->result = $this->i * 2;
            $this->garbage = true;
        }

        public function isGarbage() : bool
        {
            return $this->garbage;
        }
    });
}

while ($pool->collect(function(Collectable $task) {
    if ($task->isGarbage()) {
        echo $task->i . ' ' . $task->result . "\n";
    }
    return $task->isGarbage();
})) continue;

$pool->shutdown();

让我感到困惑的是,有时它无法获得所有任务的结果:

What's confusing me is that it sometimes doesn't get the result for all tasks:

Hello World
Hello World
Hello World
Hello World
Hello World
1 2
2 4
Hello World
Hello World
3 6
Hello World
7 14
4 8
8 16

现在缺少与5 106 12的两行,但我不明白为什么.这种情况仅在某些情况下发生(也许运行1/10).

Now two lines with 5 10 and 6 12 are missing but I don't understand why. This happens only sometimes (maybe 1/10 runs).

看起来好像原始演示版是针对pthreads的较早版本的,因为有Collectable接口,如果我没有记错的话,该接口现在会由Threaded自动实现.

It looks like the original demo is for the older version of pthreads because there's Collectable interface which is now automatically implemented by Threaded if I'm not mistaken.

然后自述文件说:

所以我想我做错了.

编辑:我以 Pool :: collect如何工作为例,并对其进行了更新以与最新的pthread和最新的PHP7一起使用,但结果是相同的.看来它无法从最近执行的线程中收集结果.

Edit: I took the example from How does Pool::collect works? and updated it to work with latest pthreads and current PHP7 but the result is the same. It looks like it's not able to collect results from the last threads that are executed.

$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Thread implements Collectable {
        public $id;
        private $garbage;

        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            sleep(1);
            printf(
                "Hello World from %d\n", $this->id);
            $this->setGarbage();
        }

        public function setGarbage() {
            $this->garbage = true;
        }

        public function isGarbage(): bool {
            return $this->garbage;
        }

    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
    return $work->isGarbage();
})) continue;

$pool->shutdown();

这将输出以下内容,这些内容显然不会收集所有线程:

This outputs the following which is clearly not collecting all threads:

Hello World from 1
Collecting 1
Hello World from 2
Collecting 2
Hello World from 3
Collecting 3
Hello World from 4
Collecting 4
Hello World from 5
Collecting 5
Hello World from 6
Hello World from 7
Collecting 6
Collecting 7
Hello World from 8
Hello World from 9
Hello World from 10

推荐答案

您已经非常正确地注意到,您复制的代码针对pthreads v2(对于PHP 5.x).

As you have quite correctly noted, the code you have copied targets pthreads v2 (for PHP 5.x).

问题归结为pthreads 中的垃圾收集器不是确定性的.这意味着它的行为不可预测,因此无法可靠地使用它来从已由池执行的任务中获取数据.

The problem boils down to the fact that the garbage collector in pthreads is not deterministic. This means it will not behave predictably, and so it cannot be reliably used in order to fetch data from the tasks that have been executed by the pool.

获取此数据的一种方法是将Threaded对象传递到要提交给池的任务中:

One way you could fetch this data would be to pass in Threaded objects into the tasks being submitted to the pool:

<?php

$pool = new Pool(4);
$data = [];

foreach (range(1, 8) as $i) {
    $dataN = new Threaded();
    $dataN->i = $i;

    $data[] = $dataN;

    $pool->submit(new class($dataN) extends Threaded {
        public $data;

        public function __construct($data)
        {
            $this->data = $data;
        }

        public function run()
        {
            echo "Hello World\n";
            $this->data->i *= 2;
        }
    });
}

while ($pool->collect());

$pool->shutdown();

foreach ($data as $dataN) {
    var_dump($dataN->i);
}

关于上述代码,需要注意以下几点:

There are a few things to note about the above code:

  • Collectable(现在是pthreads v3中的接口)已经由Threaded类实现,因此不需要自己实现.
  • 一旦任务已提交到池中,就已经将其视为垃圾,因此无需自己处理此部分.尽管您仍然可以覆盖默认的垃圾收集器,但在绝大多数情况下(包括您的情况)都不需要这样做.
  • 我仍然调用collect方法(在一个阻塞主线程的循环中,直到所有任务完成执行),以便可以垃圾回收任务(使用pthreads的默认收集器)以释放内存,而池是执行任务.
  • Collectable (which is now an interface in pthreads v3) is implemented by the Threaded class already, so there's no need to implement it yourself.
  • Once a task has been submitted to the pool, it is already considered to be garbage, and so there is no need to handle this part yourself. Whilst you still have the ability to override the default garbage collector, this should not be needed in the vast majority of cases (including yours).
  • I still invoke the collect method (in a loop that blocks the main thread until all tasks have finished executing) so that the tasks can be garbage collected (using pthreads' default collector) to free up memory whilst the pool is executing tasks.

这篇关于在PHP7 pthreads扩展中使用Pool类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 06:19