В этой статье я расскажу о том, как разработать систему для индексирования и полнотекстового поиска error-логов (или любых других логов) на основе СУБД от Яндекса под названием ClickHouse. Про саму базу Яндекс писал на Хабре сначала когда база была закрытой, а потом когда они её заопенсорсили. База данных в первую очередь предназначена для аналитики и для реализации сервиса Яндекс.Метрика, но может на самом использоваться для чего угодно, если вам подходит загружать данные пачками, удалять их тоже огромными пачками и никогда не обновлять отдельные строки.
Что мы будем делать
Мы будем реализовывать систему для индексирования и поиска по error-логам. При этом, считается, что сами логи вы уже сумели доставить на центральный сервер (или несколько серверов) и уже засунули сами тексты сообщений в базу, то есть у вас уже есть таблица в какой-нибудь базе данных примерно следующего вида:
CREATE TABLE Messages (
message_id BIGINT PRIMARY KEY AUTO_INCREMENT,
created_ts DATETIME,
message_text BLOB
)
Мы научимся быстро отдавать результаты поиска по такому логу (то есть, всегда отсортированные по времени) и индексировать его в режиме реального времени.
Почему не ElasticSearch / Sphinx / MySQL / другое_решение?
Мне кажется интересным посмотреть, что из себя представляет ClickHouse, и какие задачи с его помощью можно решать. Целью статьи является дать людям обзор и пищу для размышления, нежели дать готовое решение. Elastic, Sphinx и прочие представляют из себя готовые движки для поиска, тогда как ClickHouse является базой данных общего назначения, из которой можно слепить всё, что угодно. Также, у меня есть мнение, что система поиска, представленная в статье на основе ClickHouse будет справляться с задачей поиска по логам лучше, чем Sphinx, и при этом не нужно будет использовать 2 вида индексов (реалтайм и обычный). Ваш опыт может отличаться, поэтому рекомендую сначала попробовать сделать прототип перед тем, как внедрять такую систему в продакшен.
Установка сервера
Поручите задачу установки ClickHouse (github) вашему системному администратору или поставьте его сами из докера, если вы не хотите ничего решать, или вам просто лень. Если будете собирать сами из исходных кодов, вам потребуется до 30 гб места, имейте это в виду.
Установка клиента
Если у вас в системе почему-то нет curl или php, установите их. Дальнейшие примеры будут пользоваться curl в качестве API к базе и PHP для написания системы индексации и поиска.
Подготавливаем структуры данных для индекса
Как правило, структуры для полнотекстового поиска в движках для поиска весьма простые. Структура называется Inverted Index, и мы с вами её реализуем, в немного упрощенном виде. Мы будем пользоваться движком «по умолчанию», рекомендуемым для данных, имеющих как первичный ключ, так и дату — MergeTree:
CREATE TABLE FT (
EventDate Date,
word_id UInt32,
message_id UInt64
) ENGINE=MergeTree(EventDate, (word_id, message_id), 8192);
Чтобы создать таблицу в базе, можно воспользоваться следующей командой:
$ cat create.sql | curl 'http:/hostname:8123/?query=' --data-binary @-
В этой команде в файле create.sql должен лежать запрос, который нужно выполнить, а hostname — это хост с поднятым ClickHouse, 8123 — дефолтный порт.
В указанной выше структуре word_id — это id слова в словаре (который мы создадим позже, в словаре хранится соответствие word_text => word_id), а message_id — это id соответствующей записи в таблице с логами (аналог document_id для Sphinx).
Параметры для MergeTree движка — первое поле EventDate означает имя колонки с датой события, вторая колонка (word_id, message_id) определяет первичный ключ (по сути, обычный индекс) и 8192 — это настройка, влияющая на гранулярность индекса, мы её оставим по умолчанию.
MergeTree сортирует данные по первичному ключу и разбивает их по дате, поэтому поиск по конкретному дню и конкретному слову с сортировкой по message_id должен быть весьма быстрым.
Создаем структуры для словаря
Для того, чтобы заполнить этот индекс, нам нужна структура типа «словарь», которая нужна для того, чтобы хранить числа в ClickHouse вместо строк. Словарь можно создать в базе данных, и если это MySQL, то структура будет выглядеть следующим образом:
CREATE TABLE Words (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
word varchar(150) COLLATE ascii_bin NOT NULL DEFAULT '',
PRIMARY KEY (id),
UNIQUE KEY word (word)
) ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;
Обратите внимание на ASCII-сравнение, это позволяет сильно увеличить производительность текстовых индексов в случае, когда все слова на английском языке. Если у вас не все логи на английском языке, то рекомендую пересмотреть свои взгляды сравнение можно оставить по умолчанию (utf8_unicode_ci).
Процесс индексирования
Для того, чтобы управлять процессом индексации и чтобы инициировать начальное индексирование, можно завести отдельную таблицу с очередью для сообщений, которые мы ещё не проиндексировали:
CREATE TABLE IndexQueue (
message_id bigint(20) unsigned NOT NULL DEFAULT '0',
shard_id int(11) NOT NULL,
PRIMARY KEY (shard_id,message_id)
);
Чтобы наполнить эту таблицу в первый раз, можно использовать следующий запрос:
INSERT IGNORE INTO IndexQueue (message_id, shard_id) SELECT message_id, message_id % 4 FROM Messages
Здесь 4 — это количество потоков индексатора, которые мы будем использовать. На PHP7 код из примера ниже дает производительность примерно 3,5 мб/сек на один процесс, в 4 потока соотвественно получается 14 мб/сек. Если вы пишете больше error-логов, чем 14 мб/сек, то вероятно вам нужно срочно чинить ваш продакшен и вам не до того, что полнотекстовый поиск немного отстает :).
Алгоритм индексатора будет следующим:
- Посмотреть записи в очереди (IndexQueue) для указанного шарда
- Выбрать пачку записей и выделить в каждом сообщении слова и сложить в массив $index вида message_id => array(word1, ..., wordN)
- Для каждого слова найти соответствующий word_id в словаре, и если такого слова ещё нет, то добавить
- Вставить в индекс в ClickHouse записи по всем словам всех сообщений
Ниже приведен немного упрощенный код для разбора очереди и индексации, вам его придется доработать самостоятельно, если вы хотите его использовать у себя:
const CH_HOST = '<hostname>:8123';
const MAX_WORD_LEN = 150; // должно соответствовать тому, что в таблице Words
$mysqli = mysql_connect(...); // коннект к базе
$limit = 10000; // максимальный размер пачки сообщений при индексации
$shard_id = intval($argv[1] ?? 0); // номер шарда (указывается первым аргументом скрипту, если не указан, то будет 0)
echo "Indexing shard $shard_idn";
while ($mysqli->query('SELECT MAX(message_id) FROM IndexQueue WHERE shard_id = ' . $shard_id)->fetch_row()[0]) {
$index = "";
$start = microtime(true);
$ids = [];
foreach ($mysqli->query('SELECT message_id FROM IndexQueue WHERE shard_id = ' . $shard_id . ' ORDER BY message_id LIMIT ' . $limit)->fetch_all() as $row) {
$ids[] = $row[0];
}
if (empty($ids)) {
break;
}
$message_texts = $mysqli->query('SELECT message_id, `message_text` FROM Messages WHERE message_id IN(' . implode(', ', $ids) . ')')->fetch_all(MYSQLI_ASSOC);
$unknown_words = [];
$msg_words = [];
$total_length = 0;
foreach ($message_texts as $msg) {
$msg_id = $msg['message_id'];
$text = $msg['message_text'];
$total_length += strlen($text);
$words = array_unique(
array_filter(
preg_split('/W+/s', $text),
function($a) {
$len = strlen($a);
return $len >= 2 && $len <= MAX_WORD_LEN;
}
)
);
foreach ($words as $word) {
$unknown_words[$word] = true;
}
$msg_words[$msg_id] = $words;
}
if (!count($message_texts)) {
$mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')');
continue;
}
if (!count($unknown_words)) {
var_dump($message_texts);
die("Empty message texts!n");
}
$words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC);
$word_ids = [];
foreach ($words_res as $row) {
$word_ids[$row['word']] = $row['id'];
unset($unknown_words[$row['word']]);
}
if (count($unknown_words)) {
echo "Inserting " . count($unknown_words) . " words into dictionaryn";
$values = [];
foreach ($unknown_words as $word => $_) {
$values[] = "('" . $mysqli->escape_string($word) . "')";
}
$mysqli->query('INSERT IGNORE INTO Words (word) VALUES ' . implode(',', $values));
$words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC));
foreach ($words_res as $row) {
$word_ids[$row['word']] = $row['id'];
unset($unknown_words[$row['word']]);
}
}
if (count($unknown_words)) {
die("Could not fill dictionaryn");
}
foreach ($msg_words as $msg_id => $words) {
foreach ($words as $word) {
// здесь неявно предполагается, что unix timestamp из message_id можно вычислить путем отрезания последних 32 бит
$index .= date('Y-m-d', $msg_id >> 32) . "t" . $word_ids[$word] . "t" . $msg_id . "n";
}
}
$ch = curl_init('http://' . CH_HOST . '/?query=' . rawurlencode('INSERT INTO FT FORMAT TabSeparated'));
curl_setopt($ch, CURLOPT_POSTFIELDS, $index);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
$res = curl_exec($ch);
if ($res !== "") {
die($res . "n");
}
$mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')');
echo "Speed " . round($total_length / 1024 / (microtime(true) - $start), 2) . " KiB/secn";
}
function INstr(array $values) {
global $mysqli;
$res = [];
foreach ($values as $v) $res[] = "'" . $mysqli->escape_string($v) . "'";
return implode(',', $res);
}
Поиск по индексу
Нам не нужны алгоритмы ранжирования при поиске, которыми так богаты Elastic, Sphinx и другие решения, и нам нужна просто сортировка по дате, поэтому поиск будет исключательно прост. По сути, чтобы найти что-либо по запросу «hello world 111», нам нужно сначала найти word_id в словаре (предположим, что это будет 1, 2 и 3 соответственно), и выполнить следующий запрос:
SELECT message_id FROM FT
WHERE word_id IN(1, 2, 3)
GROUP BY message_id
HAVING uniq(word_id) = 3
ORDER BY message_id DESC
LIMIT 50
Обратите внимание на то, что в каждом документе, который мы ищем, должны присутствовать все слова из запроса, поэтому мы пишем HAVING uniq(word_id) = 3 (uniq(word_id) — это аналог COUNT(DISTINCT word_id) в обычных SQL-базах), где 3 — это количество различных слов в запросе.
Мы предполагаем, что сортировка по message_id будет означать сортировку по времени. Этого можно добиться, если в первые 32 бита message_id записывать UNIX TIMESTAMP события в секундах, а во вторую половину — микросекунды события (если есть) и случайные числа.
Результаты
Для тестирования производительности этого решения, я взял базу данных error-логов с нашего девел-сервера объемом в 3 Гб (1,6 млн событий) и проиндексировал. Индексатор показал скорость индексации в 3,5 Мб/сек на один поток, что для моего случая было более, чем достаточно. В данный момент мы используем Sphinx для полнотекстового поиска по error-логам, поэтому я могу примерно сравнить производительность этих двух решений, поскольку работают они примерно в одинаковых условиях на одном и том же железе. Индексация у Sphinx (по крайней мере, построение не realtime-индекса) в несколько раз быстрее в расчете на одно ядро, но учитывайте, что индексатор сфинкса написан на C++, а наш — на PHP :).
Чтобы вычислить самый тяжелый запрос для ClickHouse (и, очевидно, для Sphinx тоже), я решил найти самые популярные слова в индексе:
$ echo 'SELECT word_id, count() AS cnt FROM FT GROUP BY word_id ORDER BY cnt DESC LIMIT 5' | curl 'http://hostname:8123/?query=' --data-binary @-
5 1669487
187 1253489
183 1217494
159 1216255
182 1199507
Запрос занял 130 мс при общем количестве записей в 86 млн, впечатляет! (на тестовой машине 2 ядра).
Итак, если взять топ-5 и превратить word_id в нормальные слова, то запрос для исполнения получится следующий: «php wwwrun _packages ScriptFramework badoo». Эти слова встречаются у нас почти в каждом сообщении и их можно спокойно выкинуть из индекса, но я их оставил для проверки производительности поиска.
Выполняем запрос в ClickHouse:
SELECT message_id FROM FT WHERE word_id IN(189, 159, 187, 5, 183) GROUP BY message_id HAVING uniq(word_id) = 5 ORDER BY message_id DESC LIMIT 51
И похожий запрос в Sphinx:
SELECT message_id FROM FT WHERE MATCH('php wwwrun _packages ScriptFramework badoo') ORDER BY message_id DESC LIMIT 51
Времена исполнения запроса (оба демона могут использовать для выполнения запроса оба ядра, всё помещается в оперативную память):
ClickHouse: 700 мс
Sphinx: 1500 мс
Учитывая, что Sphinx умеет ранжировать результаты, а наша система нет, время у Sphinx весьма неплохое. Не забывайте, что за время выполнения запроса оба демона должны были объединить результаты для ~6 млн документов (по 1,2 млн документов на слово) и делали это на скромных 2 ядрах. Вполне возможно, что при должной настройке времена, указанные в этом (немного синтетическом) тесте, поменяются местами, но тем не менее, результатами лично я очень доволен и можно смело сказать, что для построения реалтайм-поиска по логам ClickHouse подходит очень хорошо.
Спасибо за то, что прочли статью до конца и надеюсь, что она вам понравилась.
P.S. Я не являюсь сотрудником Яндекса и с Яндексом никак не связан, я просто хотел попробовать их базу данных для реальной задачи :).
Ссылки
Автор: youROCK