PHP относится к языкам, в которых поддержка многопоточности отсутствует. Но, есть немало задач, в которых она была бы очень полезна. Как правило, это задачи, в которых одно действие нужно выполнить много раз, но с различными параметрами, причем само действие настолько ресурсоемко, а количество итераций настолько велико, что обычный цикл использовать невозможно.
Я рассмотрю простой пример, как можно достичь эмуляции многопоточности в PHP.
Допустим, нам нужно осуществить почтовую рассылку на очень большое количество адресов. Адреса хранятся в базе. Так как адресов много, то такой вариант нам не подходит, повесим сервер:
$res = mysql_query('SELECT `email` FROM `user`');
while ($row = mysql_fetch_assoc($res))
mail($row['email'], $theme, $text);
Если мы хотим использовать для этой задачи многопоточность, то неплохо бы выглядел следующий вариант:
$res = mysql_query('SELECT `email` FROM `user`');
$data = array();
while ($row = mysql_fetch_assoc($res))
$data[] = $row;
$multithreading = new MultiThreading();
// mailer.php — скрипт, который отправляет письмо по адресу, переданному ему методом GET,
// то есть [email protected]
$multithreading->setScriptName('mailer.php');
$multithreading->setParams($data);
$multithreading->execute();
Все просто и удобно. Осталось написать соответствующий класс, что я и сделал. Вот он, со всеми пояснениями в комментариях.
class MultiThreading
{
/**
* Имя сервера
*
* @var string
* @access private
*/
private $server;
/**
* Максимальное количество потоков
*
* @var int
* @access private
*/
private $maxthreads;
/**
* Имя скрипта, который выполняет нужную нам задачу
*
* @var string
* @access private
*/
private $scriptname;
/**
* Параметры, которые мы будем передавать скрипту
*
* @var array
* @access private
*/
private $params = array();
/**
* Массив, в котором хранятся потоки
*
* @var array
* @access private
*/
private $threads = array();
/**
* Массив, в котором хранятся результаты
*
* @var array
* @access private
*/
private $results = array();
/**
* Конструктор класса. В нем мы указываем максимальное количество потоков и имя сервера. Оба аргумента необязательны.
*
* @param int $maxthreads максимальное количество потоков, по умолчанию 10
* @param string $server имя сервера, по умолчанию имя сервера, на котором запущено приложение
* @access public
*/
public function __construct($maxthreads = 10, $server = '')
{
if ($server)
$this->server = $server;
else
$this->server = $_SERVER['SERVER_NAME'];
$this->maxthreads = $maxthreads;
}
/**
* Указываем имя скрипта, который выполняет нужную нам задачу
*
* @param string $scriptname имя скрипта, включая путь к нему
* @access public
*/
public function setScriptName($scriptname)
{
if (!$fp = fopen('http://'.$this->server.'/'.$scriptname, 'r'))
throw new Exception('Cant open script file');
fclose($fp);
$this->scriptname = $scriptname;
}
/**
* Задаем параметры, которые мы будем передавать скрипту
*
* @param array $params массив параметров
* @access public
*/
public function setParams($params = array())
{
$this->params = $params;
}
/**
* Выполняем задачу, комментарии в коде
*
* @access public
*/
public function execute()
{
// Запускаем механизм, и он работает, пока не выполнятся все потоки
do {
// Если не превысили лимит потоков
if (count($this->threads) < $maxthreads) {
// Если удается получить следующий набор параметров
if ($item = current($this->params)) {
// Формируем запрос методом GET
$query_string = '';
foreach ($item as $key=>$value)
$query_string .= '&'.urlencode($key).'='.urlencode($value);
$query = "GET http://".$this->server."/". $this->scriptname."?".$query_string." HTTP/1.0\r\n";
// Открыватем соединение
if (!$fsock = fsockopen($this->server, 80))
throw new Exception('Cant open socket connection');
fputs($fsock, $query);
fputs($fsock, "Host: $server\r\n");
fputs($fsock, "\r\n");
stream_set_blocking($fsock, 0);
stream_set_timeout($fsock, 3600);
// Записываем поток
$this->threads[] = $fsock;
// Переходим к следующему элементу
next($this->params);
}
}
// Перебираем потоки
foreach ($this->threads as $key=>$value) {
// Если поток отработал, закрываем и удаляем
if (feof($value)) {
fclose($value);
unset($this->threads[$key]);
} else {
// Иначе считываем результаты
$this->results[] = fgets($value);
}
}
// Можно поставить задержку, чтобы не повесить сервер
sleep(1);
// … пока не выполнятся все потоки
} while (count($this->threads) > 0);
return $this->results;
}
}
Также можно этот класс скачать, чтобы не копипастить :-).
Оригинал