- PVSM.RU - https://www.pvsm.ru -

Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Всем привет!

Уже слышали про Bigdata [1]? Ну да, веб растет, данных становится больше и их нужно держать под контролем и периодически анализировать. Базы данных — лопаются под нагрузкой, реляционная теория не совсем справляется с задачей, нужно решение. Маркетинг активно давит сверху, а железо острыми углами — снизу и попахивает суицидом.

В этом посте постараюсь дать конкретные работающие рецепты и куски кода с краткими теоретическими выводами, как же обрабатывать >=терабайты в >=1000 потоков на PHP. Чтобы можно быть взять и решить задачу, не теряя времени и не забивая голову теорией.

Однако, если вдруг стало подташнивать и закружилась голова, можно дальше не читать — а полюбоваться на прекрасных птичек и забыть о вышенаписанном. Но будьте на чеку, Bigdata может завтра взять и постучатся в дверь ;-)
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Как обычно делается

Как обычно бывает в вебе. Складывают данные в БД, пока не лопнет. Если лопается, начинаются разговоры про MySQL sharding, partitioning [2], вспоминают про мастер-мастер кластер [3] в оперативной памяти.

Если не помогает, начинаются поиски и внедрения NoSQL решения типа redis [4] или облачного сервиса типа DynamoDB [5]. Неплохо себя зарекомендовал в качестве эффективного поискового движка по объемным данным Sphinx [6].

Подсознательно идет расчет — сохраним в БД и потом проанализируем информацию. И это нередко работает. Но не всегда… и это «не всегда» становится чаще.

Данных еще больше, требуется он-лайн аналитика

Не всегда можно ответить бизнесу — подождем сутки, проанализируем логи/данные и дадим циферки. Бизнесу часто важно иметь циферки в онлайне, управлять ситуацией по приборам с живыми стрелочками.
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Страшно представить управление самолетом путем анализа записанной в черные ящики информации один раз в сутки в гостинице для пилотов :-)
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Когда поток данных становится еще интенсивнее или бизнес-логика требует наличия текущей информации по еще не обработанным данным… Тогда нам помогают инструменты «потокового анализа» типа:
1) pinba [7]
2) Amazon Kinesis [8]
3) Потоковые парсеры на базе nginx [9]/ragel [10]

Их полезно хотя бы один раз каждый из этих инструментов понять с листочком и карандашом, еще полезнее — «переспать» с мануалом и прототипом минимум ночь.
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Особо хочется выделить здесь pinba [7] за простоту настройки и легкость эксплуатации и минимум создаваемой нагрузки. Организовать сбор статистики по производительности веб-приложения в браузере его клиентов на основании js Navigation Timing API [11] — делается в 2 файла на PHP на 30 строк.

Когда же нет возможности анализировать данные онлайн — начинаются поиски решения параллельного анализа накопленных данных и связанных с ним алгоритмов.

Параллельная обработка массивов данных

Есть список объектов, допустим это файлы в облаке s3, которых у вас — десятки миллионов. Как бы мы не доверяли облаку, нужно эти файлы периодически выгружать в другое облако/серверы. Каждый файл шифруется, сжимается, происходят другие операции и копируется.

Аналогичных задач в природе немало:

  • обработка изображений
  • обработка XML-документов через XSLT-фильтр
  • обработка логов
  • сортировки

Эти задачи подпадают под общий алгоритм «разделяй и властвуй»:
— распределяем задачки на части
— каждую часть обрабатываем отдельно и параллельно с другими частями
— объединяем результаты через агрегацию
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Для PHP можно попытаться решить эту задачу используя очередь типа RabbitMQ [12] и/или Gearman [13] — но придется очень много повозиться для решения исключительных ситуаций, шардинга общей файловой системы, кластеризации на 20 серверов и т.п.

Поэтому если ваша задача может решиться в 30 потоков PHP на одном сервере — перечисленных инструментов, как правило, достаточно. Однако если вам «не повезло» и нужно за час обработать несколько терабайт и железа дают сколько унесешь — выход есть :-)

Да, да, конечно это Hadoop [14], реализующий коррелирующую с фото девушек выше парадигму MapReduce ;-)

Hadoop

Вообще это довольно большой продукт и недельки на 24/7 чтения мануалов наверно не хватит — но этого и не требуется. Мы научимся использовать эту технологию эффективно и быстро, экономя ваше и наше время.

Установка

Помимо установки java-софта потребуется еще настроить кластерную файловую систему. Зачем — а как будут ноды кластера обмениваться общими файлами? Но мы поступим хитрее — запустим кластер Hadoop в Амазоне [15]. Там все уже настроено и установлено.

Подготовка map и reduce скриптов

Вот тут самое интересное в посте. Hadoop позволяет задействовать скрипты на любом языке — и провести сортировку файла на bash или обработку на PHP/Python/Perl.

Скриптики читают из стандартного ввода и пишут в стандартный вывод. Ну что может быть проще?

Скриптиков должно быть 2: mapper, reducer.

Если нужно просто распараллелить задачу на N серверов — достаточно написать один mapper.

Пример mapper
#!/usr/bin/php
<?php
error_reporting(-1);
set_time_limit(0);
ini_set('memory_limit', '2048M');
gc_enable();

require '/usr/share/php/aws.phar';

$fp=fopen("php://stdin","r");

while (true) {

    $line=stream_get_line($fp,65535,"n");
    // тут работаем с файлами: шифруем, сжимаем, выгружаем, загружаем
    ...
}

echo "s3 copied directt".$copy_count."n";
echo "s3 copied precondt".$copy_precond_count ."n";
echo "s3 src not foundt".$s3_src_not_found ."n";

Если агрегированная статистика не нужна, второй скриптик — не нужен. Если нужна, пишем reducer:

Пример reducer
#!/usr/bin/php
<?php
error_reporting(-1);
ini_set('memory_limit', '1024M');
set_time_limit(0);
gc_enable();

$ar_reduce = array();

$ar_reduce = array();

while (($line = fgets(STDIN)) !== false) {


    $line = str_replace("n","",$line);
    $ar_line = explode("t", $line);

    if ( !isset($ar_reduce[$ar_line[0]]) ) $ar_reduce[$ar_line[0]] = 0;
    $ar_reduce[$ar_line[0]] += intval($ar_line[1]);

}


foreach ($ar_reduce as $key=>$value) {

    echo $key."t".$value."n";

}
?>
Инициализация серверов кластера

Т.к. скрипты наши на PHP, необходимо подготовить скрипт инициализации, выполняемый на каждом сервере кластера:

sudo apt-get -y update
sudo apt-get -y install libssh2-php
sudo apt-get -y install php5-curl
sudo rm -f /etc/php5/cli/conf.d/suhosin.ini
sudo mkdir -p /usr/share/php
cd /usr/share/php
sudo wget https://github.com/aws/aws-sdk-php/releases/download/2.5.0/aws.phar
...
Выгружаем скрипты на PHP и bash в облако (s3)
for FILE in bkp_s3_folder_hadoop_bootstrap.sh bkp_s3_folder_hadoop_mapper.php bkp_s3_folder_hadoop_reducer.php; do

    s3cmd -c /root/.s3cfg-key put /home/project/cron_jobs/$FILE s3://#папка скриптов#/code/

done
Выгрузка данных для обработки в s3

Просто, например с помощью s3cmd, выгружаем исходные данные для обработки в папку в s3. Эти данные потом расплывутся по кластеру автоматически. Выгрузить можно сколько угодно данных и пусть кластер с ними мучается.

Запуск обработки данных в кластере

И напоследок такая вкусняшка — запускаем кластер для обработки наших данных.

D=$(date +"%Y-%m-%d_%H-%M-%S")

/opt/aws/emr/elastic-mapreduce --create --stream 
--name myproject_$D 
--step-name step_$D 
--with-termination-protection 
--step-action CANCEL_AND_WAIT 
--ami-version '2.4.2' 
--bootstrap-action '#путь к скрипту инициализации серверов, см. выше#' 
--bootstrap-action 's3://elasticmapreduce/bootstrap-actions/configure-hadoop' 
--args "-m,mapred.map.max.attempts=20,-m,mapred.tasktracker.map.tasks.maximum=15,-m,mapred.task.timeout=600000" 
--input 's3://#папка с исходными файлами для обработки#/input/' 
--mapper 's3://#папка скриптов#/code/#наш mapper#.php' 
--reducer 's3://#папка скриптов#/code/#наш reducer#.php' 
--output 's3://#папка с логами#/output_'$D 
--log-uri 's3://#папка с логами#/logs/' 
--num-instances 5 
--master-instance-type m1.small 
--slave-instance-type m1.xlarge 
--key-pair 'myproject_mapreduce'

Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce
Тут важно подобрать правильно число железок для размножения кластера — чем больше, тем конечно быстрее. В данном примере мы устанавливаем не больше 15 процессов на один сервер. Можно больше, это зависит от объема оперативной памяти, но осторожно — следим за ее расходом.

После отработки кластера в логах можно будет увидеть агрегированную статистику, логи также будут выгружены в s3.

Обычно скорость обработки, которая до этого делалась неделями — поражает, вдохновляет и выводит на новый уровень осознания IT-континиума не хуже последней части «300 спартанцев» :-)
Как обрабатывать терабайты данных в 1000 потоков на PHP — Hadoop/MapReduce

Итоги

В результате у вас появляется бизнес-инструмент, управляемый 2 скриптами на PHP. Число серверов (--num-instances 5) напрямую влияет на скорость обработки загруженного массива данных. В принципе никто не запрещает запустить 100 серверов с 10 потоками на каждом и обработать данные значительно быстрее, чем можно было сделать на одном сервере используя очередь заданий.

Используя данную технологию простым и понятным образом, мы на одном из наших проектов [16] сократили время обработки десятков миллионов объектов в s3 с недель до 2 дней.

Коллеги, если есть вопросы, пожалуйста спрашивайте в комментах и посещайте наши конференции [17] — мы с удовольствием поделимся опытом. И всем удачи в реализации веб-проектов и побед над Bigdata!

Автор: AlexSerbul

Источник [18]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/sistemnoe-administrirovanie/58283

Ссылки в тексте:

[1] Bigdata: http://en.wikipedia.org/wiki/Big_data

[2] partitioning: https://dev.mysql.com/doc/refman/5.6/en/partitioning.html

[3] мастер-мастер кластер: http://www.mysql.com/products/cluster/

[4] redis: http://redis.io/

[5] DynamoDB: http://aws.amazon.com/dynamodb/

[6] Sphinx: http://www.1c-bitrix.ru/products/cms/new/new140.php?sphrase_id=3012622#tab-sphinx-link

[7] pinba: http://pinba.org/

[8] Amazon Kinesis: http://aws.amazon.com/kinesis/?nc1=h_l2_al

[9] nginx: http://www.evanmiller.org/nginx-modules-guide-advanced.html#parsing

[10] ragel: http://en.wikipedia.org/wiki/Ragel

[11] Navigation Timing API: http://www.w3.org/TR/navigation-timing/

[12] RabbitMQ: https://www.rabbitmq.com/

[13] Gearman: http://gearman.org/

[14] Hadoop: http://ru.wikipedia.org/wiki/Hadoop

[15] Hadoop в Амазоне: http://aws.amazon.com/elasticmapreduce/

[16] из наших проектов: https://www.bitrix24.ru

[17] конференции: http://www.failoverconf.ru/conf2014/

[18] Источник: http://habrahabr.ru/post/218003/