Big Data от A до Я. Часть 5.2: Продвинутые возможности hive

в 11:12, , рубрики: big data, Hive, MapReduce, Блог компании DCA (Data-Centric Alliance)

Привет! В этой статье мы продолжим рассматривать возможности hive — движка, транслирующего SQL-like запросы в MapReduce задачи.

В предыдущей статье мы рассмотрели базовые возможности hive, такие как создание таблиц, загрузка данных, выполнение простых SELECT-запросов. Теперь поговорим о продвинутых возможностях, которые позволят выжимать максимум из Hive.

Big Data от A до Я. Часть 5.2: Продвинутые возможности hive - 1

User Defined Functions

Одним из основных препятствий при работе с Hive является скованность рамками стандартного SQL. Эту проблему можно решить при помощи использования расширений языка — так называемых «User Defined Functions». Довольно много полезных функций встоено прямо в язык Hive. Приведу несколько самых интересных на мой взгляд(информация взята из оффициальной документации):

Json

Довольно частой задачей при работе с большими данынми является обработка неструктурированных данных, хранящихся в формате json. Для работы с json hive поддерживать специальный метод get_json_object, позволяющий извлекать значения из json-документов. Для извлечения значений из объекта используется ограниченная версия нотации JSONPath. Поддерживаются следующие операции:

  • $: Возвращает корневой объект
  • .: Вовзращает объект-ребенок
  • []: Обращение по индексу в массиве
  • *: Wildcard для

Примеры работы с Json из оффициальной документаци:

Пусть есть таблица: src_json, состоящаяя из одной колонки(jsoin) и одной строки:

{"store":
  {"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],
   "bicycle":{"price":19.95,"color":"red"}
  },
 "email":"amy@only_for_json_udf_test.net",
 "owner":"amy"
}

Примеры запросов к таблице:

hive> SELECT get_json_object(src_json.json, '$.owner') FROM src_json;
amy
 
hive> SELECT get_json_object(src_json.json, '$.store.fruit[0]') FROM src_json;
{"weight":8,"type":"apple"}
 
hive> SELECT get_json_object(src_json.json, '$.non_exist_key') FROM src_json;
NULL

Xpath

Аналогично, если данные которые необходимо обрабатывать при помощи hive хранятся не в json, а в XML — их можно обрабатыватывать при помощи функции xpath, позвоялющей парсить XML при помощи соответствующего языка. Пример парсинга xml-данных при помощи xpath:

hive> select xpath('<a><b>b1</b><b>b2</b></a>','a/*/text()') from sample_table limit 1 ;
["b1","b2"]

Другие полезные встроенные функции:

Встроенная библиотека содержит довольно богатый набор встроенных функций. Можно выделить несколько групп:

  • Математические функции (sin, cos, log, …)
  • Функции работы со временем(from_unix_timestamp, to_date, current date, hour(string date), timediff, …) — очень богатый выбор функций для преобразования дат и времени
  • Функции для работы со строками. Поддерживаются как общеприменимые функции, такие как lengh, reverse, regexp, так и специфичные — типа parse_url или уже рассмотренной get_json_object)
  • Много различных системных функций — current_user, current_database, …
  • Криптографические функции — sha, md5, aes_encrypt, aes_decrypt...

Полный список встроенных в hive функций можно найти по ссылке.

Написание собственных UDF

Не всегда бывает достаточно встроенных в hive функций для решения поставленной задачи. Если встроенной функции не нашлось — можно написать свою UDF. Делается это на языке java.

Разберем создание собственной UDF на примере простой функции преобразования строки в lowercase:

1. Создадим пакет com/example/hive/udf и создадим в нем класс Lower.java:

mkdir -p com/example/hive/udf
edit  com/example/hive/udf/Lower.java

2. Реализуем собственно класс Lower:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public final class Lower extends UDF {
  public Text evaluate(final Text s) {
    if (s == null) { return null; }
    return new Text(s.toString().toLowerCase());
  }
}

3. Добавим необходимые библиотеки в CLASSPATH (в вашем hadoop-дистрибутиве ссылки на jar-файлы могут быть немного другими):

export CLASSPATH=/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar:/opt/cloudera/parcels/CDH/lib/hadoop/hadoop-common.jar

4. Компилируем нашу UDF-ку и собираем jar-архив:

javac com/example/hive/udf/Lower.java
jar cvf my_udf.jar *

5. Для того чтобы можно было использовать функцию в hive — нужно явно ее декларировать:

hive> ADD JAR my_udf.jar;

hive> create temporary function my_lower as 'com.example.hive.udf.Lower';

hive> select my_lower('HELLO') from sample_table limit 1;
hello

Трансформация таблицы при помощи скриптов

Еще одним способом расширения стандартного функционала HIVE является использование метода TRANSFORM, который позволяет преобразовывать данные при помощи кастомных скриптов на любом языке программирования(особенно это подходит тем кто не любит java и не хочет писать на ней udf-ки).  

Синтаксис для использования команды следующий:

SELECT TRANSFORM(<columns>) USING <script> as <new_columns>

<script> — в данном случае это программа, которая получает данные на stdin, преобразует их и выдает на stdout преобразованные данные. По сути это очень похоже на streaming-интерфейс к запуску map-reduce задач, о котором мы писали в статье Big Data от А до Я. Часть 2: Hadoop

Пример:

Пусть у нас есть таблица с зарплатами пользователей, получающих зарплату в разной валюте:

+-------------------+---------------------+-----------------------+
| user_salary.name  | user_salary.salary  | user_salary.currency  |
+-------------------+---------------------+-----------------------+
| alexander         | 100000              | RUB                   |
| evgeniy           | 4000                | EUR                   |
| alla              | 50000               | RUB                   |
| elena             | 1500                | EUR                   |
+-------------------+---------------------+-----------------------+

Мы хотим получить табличку, в которой будут рублевые зарплаты для всех пользователей. Для этого напишем скрипт на python, который выполняет преобразование данных:

import sys
EXCHANGE_RATE = 75

for line in sys.stdin:
        name, salary, currency = line.rstrip("n").split('t')
        if currency == 'EUR':
                print name + "t" + str(int(salary) * EXCHANGE_RATE)
        else:
                print name + "t" + salary

Скрипт подразумевает что данные на вход поступают в tsv-формате(колонки разделены знаком табуляции). В случае если в таблице встретится значение NULL на вход скрипта попадет значение ‘N’

Дальше используем этот скрипт для преобразования таблицы:

0: jdbc:hive2://localhost:10000/default> select 
       transform(name, salary, currency) 
       using 'python transform_to_rub.py' as (name, rub_salary) 
       from user_salary;

+------------+-------------+
|    name    | rub_salary  |
+------------+-------------+
| alexander  | 100000      |
| evgeniy    | 300000      |
| alla       | 50000       |
| elena      | 112500      |
+------------+-------------+

По сути использование операции TRANSFORM дает возможность полностью заменить классический MapReduce при помощи hive.

MapJoin

Как мы писали в статье про приемы и стратегии работы с MapReduce — для  реализации JOIN’a двух таблиц в общем необходимо несколько MapReduce задач. Так как hive работает именно на MapReduce — то JOIN для него также является дорогой операцией.
Однако если одна из двух таблиц, которые необходимо сджойнить полностью влазит в оперативную память какждой ноды — можно обойтись одним MapReduce, загрузив табличку в память. Этот паттерн называется MapJoin. Для того чтобы Hive использовал именно MapJoin — необходимо дать ему подсказку(«hint» в терминологии Hive).

Пример:

SELECT /*+ MAPJOIN(time_dim) */ COUNT(*) from
store_sales JOIN time_dim on (ss_sold_time_sk = t_time_sk)

В этом примере подразумевается что таблица «store_sales» — большая,  а таблица «time_dim» — маленькая и влазит в память. /*+ MAPJOIN(time_dim) */ — это и есть та самая подсказка для для HIVE о запуске задачи MAPJOIN-задачи

Транзакционная модель

Транзакционная модель ACID подразумевает поддержку 4х основных свойств:

  • Атомарность — операция либо целиком выполняется полностью изменяя данные, либо падает и не оставляет за собой следов.
  • Консистентность — после того как приложение выполняет операцию ее результат становится доступным для всех последующих операций.
  • Изоляция — операции одних пользователей не имеют побочных эффектов на других пользователей.
  • Долговечность — изменения сделанные в результате успешной опрации сохраняют результат даже в случае системного сбоя.

Вообще говоря Hive не очень хорошо подходит для работы с изменяющимися данными, однако есть несколько кейсов, где поддержка изменяющихся данных необходима. В первую очередь это:

  • Данные, добавляющиеся в потоковом режиме(из таких систем, как flume, kafka). Хочется чтобы данные были доступны для анализа в hive сразу как они поступили
  • Обновление схемы — например добавление новой колонки в таблицу hive. Хочется чтобы колонка либо успешно добавилась к каждой записи, либо упала и не добавилась ни к одной
  • Иногда все-таки необходимо обновление отдельных записей.

Для этих целей в hive начиная с версии 0.14 была реализованна поддержка транзакционной модели, реализуемая четыремя операциями — INSERT, UPDATE и DELETE.

Поддержка этих операций очень ограниченна:

  • На данный момент поддерживаются только файлы формата ORC
  • По умолчанию поддержка транзакций отключена. Для включения необходимо внести соответствующие изменения в конфигурационный файл hive.
  • Нет поддержки команд BEGIN, COMMIT, ROLLBACK  привычных по реляционным базам данным.

Поддержка транзакций реализована при помощи дельта-файлов. То есть при выполнеии операции обновления данных данные в исходном файле не обновляются,  а создается новый файлик где отмечено какие строки были измененеы.  Позже hive их объеденит при помощи операции compaction(аналогичная используется в hbase).

В общем, так как поддержка транзакций очень сильно лимитирована — стоит очень серьезно подумать перед тем как использовать этот функционал в Hive. Возможно стоит посмотреть в сторону HBase или традиционных реляционных баз данных.

Заключение

В этой и предудщей статье цикла мы рассмотрели основные возможности Hive — мощного инструмента, облегчающего работу с MapReduce задачами. Hive прекрасно подходит аналитикам, привыкшим работать с SQL, может быть легко интегрирован в существующую инфраструктуру при помощи поддержки драйвера JDBC, а с учетом поддержки User Defined Functions и кастомных трансформаций — позволяет полностью перевести процессинг данных с классического MapReduce на себя. Однако hive не является «серебрянной пилюлей» — для часто обновляемых данных можно можно посмотреть в сторону таких инструментов как Hbase и классические реляционые базы данных.

В следующих статьях цикла мы продолжим рассмотрение инструментов для работы с большими данными и методах их обработки.

Автор: DCA (Data-Centric Alliance)

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js