Bigdata стек глазами воинствующего ораклойда

в 15:17, , рубрики: big data, BigData, Hadoop, spark

На Хабре и прочих интернетах чуть не каждый день постят пустые статьи о бигдата, создавая у спецов стойкое ощущение, что кроме маркетинга за стеком бигдаты ничего нет. На самом деле там достаточно интересных технологий под капотом Hadoop и тут я хочу слегка разбавить маркетинг, взглядом технического спеца с опытом Oracle.

В первую очередь стоит понимать, что один из столпов бигдаты Hadoop, это не только батч процессинг и map-reduce, как многие пытаются изобразить. Это запросто может быть обработка и с противоположного спектра задач: чтение потока мелких сообщений, например от IoT (spark на Hadoop, читает Kafka stream), на ходу агрегируя и выявляя отклонения. Это могут быть тучи мелких запросов отчетных систем с JOINs к parquet файлам через Impala. Все это все та же экосистема Hadoop. Там очень много всего, различной степени зрелости и требующего различного подхода. Причем поверьте, никто толком не знает наиболее выигрышный вариант. Где-то задачи отлично ложатся на классические map-reduce и достаточные примитивные parquet на hdfs, где-то Spark с обычными SQL JOINs чуть не на текстовых файликах. Крупнейший дистрибутив Cloudera сейчас активно продвигает нечто напоминающее базу данных Kudu, которая может как в java стайл map-reduce использоваться, так и под SQL в Impala.

Там много всего и почти все это между собой комбинируется, при этом в зависимости от комбинации подходы обработки могут быть очень и очень разными. В принципе это отдаленно напоминает метания Oracle в конце 90х, начала нулевых, когда они в субд начали пихать всякие xml таблицы, java stored procedures и прочие странные вещи.

По началу попав в проект с Hadoop/map-reduce, было совершенно не понятно, за счет чего с таким подходом можно соревноваться с полноценными СУБД, ведь всё общение map-reduce проходит через писанину вна hdfs. Сначала маппер читает все подряд, потом пишет на hdfs свой аутпут для редюсеров, потом редюсеры читают, снова пишут. С оракловыми представлениями о прекрасном казалось это точно не взлетит. Но позже появилось понимание за счет чего.

Примерно как Oracle делает с виду много лишней работы, пытаясь за счет «лишней» писанины на диски увеличить параллельное выполнение. Oracle, например, пишет в «лишний» UNDO, что позволяет ему развести конкурирующих читателей. Пишет блокировки в блоки данных, что позволяет не держать огромные списки блокировок в памяти, прозрачно обменивать инфой о блокировках между нодами RAC кластера. Примерно в этом же ключе стоит смотреть на с виду лишнюю писанину map-reduce. Все это «лишнее» в итоге позволяет выполнять гораздо больше задач в параллель, на фоне Oracle.

При этом я был сразу удивлен размером parquet файлов, таблички в Oracle без индексов на 80-100 гб, легко превращаются в 20-30 гб parquet. В результате map-reduce читает с дисков в разы меньше, загружая сразу тучу ядер кластера, в то время как Oracle и прочесть должен больше и все вычисления возлагает на единственный пользовательский процесс. Это камень в огород PL/SQL машины, хотя и у SQL движка оракла тоже тьма нюансов на тему параллельного чтения.

Что бы было понятно, как же выглядит реализация логики в Hadoop могу проиллюстрировать одну из «пятничных задачек» с оракловой ветки sql.ru. В принципе все началось с баталии на тему map-reduce vs Spark.

Задачка такая:

Написать запрос, который для каждого adjusted value находит
1) предыдущее original (ordered by id)
2) original со сдвигом, который задается переменной (предыдущее считается со сдвигом 0)
3) число предыдущих original так чтоб их сумма не превышала заданный лимит

var shift number
var limit number
exec :shift := 2;

PL/SQL procedure successfully completed.

exec :limit := 2000;

PL/SQL procedure successfully completed.

В Oracle решение было предложено таким:

SQL> select t.id,
  2         t.type,
  3         t.value,
  4         decode(type, 'adjusted', max(decode(type, 'original', value)) over(partition by grp)) prev_o_value,
  5         decode(type, 'adjusted', max(decode(type, 'original', shift_n)) over(partition by grp)) prev_shift_n_o_value,
  6         decode(type, 'adjusted', count(decode(type, 'original', id))
  7                over(order by orig_val_running_sum range between 2000 preceding and 1 preceding)) count_o
  8    from (select t.*,
  9                 sum(decode(type, 'original', value)) over(order by id) - decode(type, 'original', value, 0) orig_val_running_sum,
 10                 decode(type, 'original', lag(value, 2) over(order by decode(type, 'original', 0, 1), id)) shift_n,
 11                 sum(decode(type, 'original', 1)) over(order by id) grp
 12            from t) t
 13   order by id;

        ID TYPE                                VALUE PREV_O_VALUE PREV_SHIFT_N_O_VALUE    COUNT_O
---------- ------------------------------ ---------- ------------ -------------------- ----------
        10 original                              100
        20 original                              200
        30 adjusted                              300          200                               2
        40 original                              400
        50 adjusted                              500          400                  100          3
        60 original                              600
        70 original                              700
        80 adjusted                              800          700                  400          5
        90 adjusted                              900          700                  400          5
       100 original                             1000
       110 adjusted                             1100         1000                  600          2
       120 original                             1200
       130 adjusted                             1300         1200                  700          1
       140 original                             1400
       150 adjusted                             1500         1400                 1000          1

15 rows selected.

В SparkSQL задачка решается без третьего пункта так


select t.id,
       t.type,
       t.value,
       case when type = 'adjusted' then max(case when type = 'original' then value end) over (partition by position, grp) end prev_o_value,
       case when type = 'adjusted' then max(case when type = 'original' then shift_n end) over (partition by position, grp) end prev_shift_n_o_value
  from (select t.*,
               case when type = 'original'
                    then lag(value, 2) over (partition by position order by case when type = 'original' then 0 else 1 end, id)
               end shift_n,
               sum(case when type = 'original' then 1 end) over (partition by position order by id) grp
          from t) t
order by id
+---+--------+-----+------------+--------------------+
|id |type    |value|prev_o_value|prev_shift_n_o_value|
+---+--------+-----+------------+--------------------+
|10 |original|100  |null        |null                |
|20 |original|200  |null        |null                |
|30 |adjusted|300  |200         |null                |
|40 |original|400  |null        |null                |
|50 |adjusted|500  |400         |100                 |
|60 |original|600  |null        |null                |
|70 |original|700  |null        |null                |
|80 |adjusted|800  |700         |400                 |
|90 |adjusted|900  |700         |400                 |
|100|original|1000 |null        |null                |
|110|adjusted|1100 |1000        |600                 |
|120|original|1200 |null        |null                |
|130|adjusted|1300 |1200        |700                 |
|140|original|1400 |null        |null                |
|150|adjusted|1500 |1400        |1000                |
+---+--------+-----+------------+--------------------+

Как видим решение на spark-sql почти от оракла и не отличается. А вот map-reduce решение:

Mapper

public class ParquetMapper extends Mapper<LongWritable, GenericRecord, Text, AvroValue<GenericRecord>> {
    private final Text outputKey = new Text();
    private final AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>();

    @Override
    protected void map(LongWritable key, GenericRecord value, Context context) throws IOException, InterruptedException {
        outputKey.set(String.valueOf(value.get("position")));
        outputValue.datum(value);
        context.write(outputKey, outputValue);
    }
}

Reducer

public class ParquetReducer extends Reducer<Text, AvroValue<GenericRecord>, Void, Text> {
    private static final byte shift = 2 ;

    private TreeMap<Integer, AbstractMap.SimpleEntry<String, Integer>> rows = new TreeMap<Integer,AbstractMap.SimpleEntry<String, Integer>>();
    List<Integer> queue = new LinkedList<Integer>();
    private String adj = "";
    private int lastValue = -1;


    @Override
    protected void reduce(Text key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
        for (AvroValue<GenericRecord> value : values) {
            rows.put((Integer) value.datum().get("id"),
                      new AbstractMap.SimpleEntry(value.datum().get("type"), value.datum().get("value"))) ;
        }

        for(Map.Entry<Integer, AbstractMap.SimpleEntry<String, Integer>> entry : rows.entrySet()) {
            AbstractMap.SimpleEntry<String, Integer> rowData = entry.getValue();

            if (rowData.getKey().equals("original")) {
                lastValue = rowData.getValue() ;
                queue.add(lastValue) ;
                adj = "" ;
            } else {
                adj = " " + String.valueOf(lastValue);
                if (queue.size()- shift >0) {
                    adj = adj + " " + queue.get(queue.size()-shift).toString() ;
                }
            }
            Text output = new Text(entry.getKey()+" "+rowData.getKey() + " " + rowData.getValue() + adj);

            context.write(null, output );
        }
    }
}

Запускаем

[yarn@sandbox map-reduce]$ hadoop fs -cat /out/part-r-00000
10 original 100
20 original 200
30 adjusted 300 200
40 original 400
50 adjusted 500 400 200
60 original 600
70 original 700
80 adjusted 800 700 600
90 adjusted 900 700 600
100 original 1000
110 adjusted 1100 1000 700
120 original 1200
130 adjusted 1300 1200 1000
140 original 1400
150 adjusted 1500 1400 1200

Итог: бигдата стек огромен, там множество подсистем работающих в своей парадигме, map-reduce лишь один из болтиков, не факт, что теперь и всем нужный, в свете расцвета моды на Spark. Батч процессинг не единственная парадигма реализуемая в бигдата, а Spark уже позволяет писать логику в том числе и декларативным SQL языком. Map-reduce же тоже вполне себе красиво кладется на некоторые задачи, при этом с легкостью решает задачи, которые еще недавно были по силам лишь серьезным оракловым серверам. Если приглядеться к коду то видно, что на Spark-sql так и не реализован пункт 3 задания, зато map-reduce код, хоть и не декларативный, вышел достаточно элегантным и запросто расширяемым. Пукт 3 задания запросто добавляется в map-reduce код за несколько минут девелопером средней паршивости, тогда как нагромождение аналитических функций в оракловом решении потребует серьезной подготовки девелопера.

Автор: Yo1

Источник

Поделиться

* - обязательные к заполнению поля