Hadoop Tutorial. Пишем свой grep

в 9:13, , рубрики: Apache, big data, Hadoop, hello world, java, MapReduce, метки: , , , ,

Доброго времени суток, дорогое читатели. Не так давно я начал изучать работу с большими данными (Map/Reduce, NoSQL...) и очень быстро узнал о фреймворке с открытым исходным кодом Apache Hadoop, за изучение которого сразу и принялся.

Данный пост рассчитан на новичков, которые тоже не так давно начали изучать Hadoop. В посте будет разобрано небольшое приложение построенное на этом фреймворке(Этакий Hello World!). Кому интересно, добро пожаловать под кат.

Данный топик не рассматривает процесс установки, настройки и проблем с запуском, однако ресурсы для изучения вы можете посмотреть внизу. Мною в работе были использованы следующие технологии:

  • Linux Ubuntu 13.04;
  • Oracle Java 1.7;
  • Hadoop 1.1.2;
  • Intellij IDEA 12;

По скольку счетчик слов (он же Word Count) продемонстрирован в подавляющем большинстве туториалов, я решил разнообразить эту тему и в качестве примера разобрал grep.
Наша реализация будет получать на вход:

  • Папку с файлами(Файл) для поиска совпадений по регулярному выражению;
  • Путь для сохранения результатов;
  • Регулярное выражение;

На выходе мы получим файл(ы) которые содержат полные пути к файлам(ключи) в которых нашлись совпадения и строки(значения) с этими совпадениями в файле.

Весь процесс обработки данных построен на парадигме MapReduce. Суть ее в том, что мы разделяем всю работу на два этапа: map и reduce.
Итак, преступим.

Map

На этом шаге мы в качестве аргумента получаем ключ и значение. Далее эти данные проходят обработку подавая на выход список ключей и список значений.
Наша реализация map функции:

/*
* Маппер.
* Пример построен с использованием нового API org.apache.hadoop.mapreduce.*
*/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/*
* LongWritable - Тип входного ключа(Номер строки).
* Text - Тип входного значения (Строка под номером ключа).
* Text - Тип выходного ключа(Полный путь к файлу).
* Text - Тип выходного значения(Строка из файла с совпадением).
*/
public class RegexMapper extends Mapper<LongWritable, Text, Text, Text>{
    private Pattern pattern;
    private Text    keyOut; //В качестве ключа на выход будет взят полный путь к файлу.

    /*
    * Метод setup() вызывается перед вызовом метода map()(который переопределен ниже).
    * Используется для отделения подготовительных действий от самой map() функции.
    */
    @Override
    public void setup(Context context) throws IOException{
        /*
        * Берем сохраненный аргумент(регулярное выражение),
        * который был сохранен в Driver-классе(будет описан далее).
        */
        pattern = Pattern.compile(context.getConfiguration().get("regex"));

        //Получаем полный путь входной строки файла (valueIn).
        Path filePath = ((FileSplit) context.getInputSplit()).getPath();
        keyOut        = new Text(filePath.toString());
    }

    /*
    * Сам map() метод. Создаем матчер и ищем в строке совпадения. В случае нахожде-
    * ния записываем полный путь файла в качестве ключа(keyOut - получен в setup()
    * методе) и строку из этого файла в качестве значения(valueIn - получена 
    * как аргумент метода). 
    */
    @Override
    public void map(LongWritable key, Text valueIn, Context context) 
                                 throws IOException, InterruptedException {
        Matcher matcher = pattern.matcher(valueIn.toString());
        
        //По скольку входным значением является только одна строка файла, то нам доста-
        //точно найти хотя бы одно совпадение, что бы строка подходила условиям поиска.
        if (matcher.find())
            context.write(keyOut, valueIn); //Запись пары ключ значение
    }
}

Вот и все. Переходим к reduce.

Reduce

На этапе reduce мы получаем в качестве аргумента один ключ и все соответствующие ему значения полученные на выходе map метода(ов) для их последующей обработки. В нашем случае мы получили путь к файлу, где найден текст соответствующий заданному шаблону(ключ) и набор строк, где были найдены совпадения(список значений).

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
/*
*Все ключи и значения типа Text
*/
public class RegexReducer extends Reducer<Text, Text, Text, Text> {
    
    /*
    * В методе мы форматируем полученные данные для записи в файл.
    */
    @Override
    public void reduce(Text keyIn, Iterable<Text> valuesIn, Context context) 
                                      throws IOException, InterruptedException {
         
        //Для конкатенации строк воспользуемся StringBuilder.
        StringBuilder valueOut = new StringBuilder();

        for(Text value: valuesIn)
            valueOut.append("n" + value.toString());
        valueOut.append("n");

        context.write(keyIn, new Text(valueOut.toString()));
    }
}

С map и reduce разобрались. Осталось это все упаковать в класс-драйвер и запустить.

Driver

В классе-драйвере происходит настройка задачи(установка маппера и редусера, типа входных и выходных данных и т. д.).
В общем вот:

import com.petrez.mappers.RegexMapper;
import com.petrez.reducers.RegexReducer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class Grep {

    public static void main(String[] args) 
                throws IOException, ClassNotFoundException, InterruptedException {
        if(args.length != 3) {
            System.out.println("Usage: <inDir> <outDir> <regex>");
            ToolRunner.printGenericCommandUsage(System.out);
            System.exit(-1);
        }

        Configuration config = new Configuration();
        /*
        * Сохраняем регулярное выражение для map() метода с ключом regex.
        */
        config.set("regex", args[2]);

        Job job = new Job(config, "grep");
        
        /*
        * Я запускаю программу из jar-файла, поэтому указание главного класса
        * приложения необходимо.
        */
        job.setJarByClass(Grep.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        /*
        * Вот. TextInputFormat разбивает входные файлы на строки и подает их
        * в качестве аргумента map функциям. В качестве
        * разделителя используется символ возврата каретки.
        */
        job.setInputFormatClass(TextInputFormat.class);       
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(RegexMapper.class);
        job.setReducerClass(RegexReducer.class);

        job.waitForCompletion(true);
    }
}

Осталось только запустить.

В силу того, что реализация рассчитана для начинающих, она упрощена в ущерб эффективности, а именно создание на каждую строку по отдельному мапперу. Данный вариант очень упрощает реализацию маппера и редусера, но сильно нагружает память. Прошу учесть.

По скольку я все упаковал в исполняемый jar-файл, то запустить нашу программку можно так:

<путь к hadoop>/bin/hadoop jar /home/hduser/HadoopGrep.jar <путь с файлами для анализа> <путь для сохранения результатов> <регулярное выражение>

Путем для сохранения результатов должна быть несуществующая директория. Если Вы настроили Hadoop в pseudo-distributed mode, то данные сохраняются в файловой системе HDFS и вам их оттуда еще нужно будет вытащить.

Материалы для изучения:

Всем спасибо.

Автор: methode

Источник

Поделиться

* - обязательные к заполнению поля