Централизованные логи для приложений с помощью связки heka+elasticsearch+kibana

в 9:00, , рубрики: centralized logging, elasticsearch, Go, golang, java, python, ит-инфраструктура, метки:

В статье описана настройка центрального логирования для разных типов приложений (Python, Java (java.util.logging), Go, bash) с помощью довольно нового проекта Heka.

Heka разрабатывается в Mozilla и написана на Go. Именно поэтому я использую его вместо logstash, который имеет сходные возможности.

Heka основан на плагинах, которые имеют пять типов:

  1. Входные — каким-то образом принимают данные (слушает порты, читают файлы и др.);
  2. Декодеры — обрабатывают входящие запросы и переводят их во внутренние структуры для сообщений;
  3. Фильтры — производят какие либо действия с сообщениями;
  4. Encoders (неясно, как переводить) — кодируют внутренние сообщения в форматы, которые отправляются через выходные плагины;
  5. Выходные — отправляют куда-либо данные.

Например, в случае Java-приложений входным плагином является LogstreamerInput, который смотрит за изменениями в файле логов. Новые строки в логе обрабатываются декодером PayloadRegexDecoder (по заданному формату) и дальше отправляются в elasticsearch через выходной плагин ElasticSearchOutput. Выходной плагин в свою очередь кодирует сообщение из внутренней структуры в формат elasticsearch через ESJsonEncoder.

Установка Heka

Все способы установки описаны на сайте (http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries). Но проще всего скачать готовый бинарный пакет под свою систему со страницы https://github.com/mozilla-services/heka/releases.

Поскольку у меня используются сервера под ubuntu, то и описание будет для этой системы. В данном случае установка сводится к установке самого deb пакета, настройке файла конфигурации /etc/hekad.toml и добавления в сервисы upstart.

В базовую настройку /etc/hekad.toml у меня входит настройка количества процессов (я ставлю равным количеству ядер), dashboard (в котором можно посмотреть какие плагины включены) и udp сервер на 5565 порту, который ожидает сообщения по протоколу google protobuf (используется для python и go приложений):

maxprocs = 4

[Dashboard]
type = "DashboardOutput"
address = ":4352"
ticker_interval = 15

[UdpInput]
address = "127.0.0.1:5565"
parser_type = "message.proto"
decoder = "ProtobufDecoder"

Конфигурация для upstart /etc/init/hekad.conf:

start on runlevel [2345]
respawn
exec /usr/bin/hekad -config=/etc/hekad.toml

Логирование Python приложений

Здесь используется библиотека https://github.com/kalail/heka-py и специальный хендлер для модуля logging. Код:

import logging
from traceback import format_exception

try:
    import heka
    HEKA_LEVELS = {
        logging.CRITICAL: heka.severity.CRITICAL,
        logging.ERROR: heka.severity.ERROR,
        logging.WARNING: heka.severity.WARNING,
        logging.INFO: heka.severity.INFORMATIONAL,
        logging.DEBUG: heka.severity.DEBUG,
        logging.NOTSET: heka.severity.NOTICE,
    }
except ImportError:
    heka = None


class HekaHandler(logging.Handler):
    _notified = None
    conn = None
    host = '127.0.0.1:5565'

    def __init__(self, name, host=None):
        if host is not None:
            self.host = host

        self.name = name
        super(HekaHandler, self).__init__()

    def emit(self, record):
        if heka is None:
            return

        fields = {
            'Message': record.getMessage(),
            'LineNo': record.lineno,
            'Filename': record.filename,
            'Logger': record.name,
            'Pid': record.process,
            'Exception': '',
            'Traceback': '',
        }

        if record.exc_info:
            trace = format_exception(*record.exc_info)
            fields['Exception'] = trace[-1].strip()
            fields['Traceback'] = ''.join(trace[:-1]).strip()

        msg = heka.Message(
            type=self.name,
            severity=HEKA_LEVELS[record.levelno],
            fields=fields,
        )

        try:
            if self.conn is None:
                self.__class__.conn = heka.HekaConnection(self.host)

            self.conn.send_message(msg)
        except:
            if self.__class__._notified is None:
                print "Sending HEKA message failed"
                self.__class__._notified = True

По умолчанию хендлер ожидает, что Heka слушает на порту 5565.

Логирование Go приложений

Для логирования я форкнул библиотеку для логирования https://github.com/ivpusic/golog и добавил туда возможность отправки сообщений прямо в Heka. Результат расположен здесь: https://github.com/ildus/golog

Использование:

import "github.com/ildus/golog"
import "github.com/ildus/golog/appenders"
...
logger := golog.Default
logger.Enable(appenders.Heka(golog.Conf{
    "addr": "127.0.0.1",
    "proto": "udp",
    "env_version":  "2",
    "message_type": "myserver.log",
}))
...
logger.Debug("some message")

Логирование Java приложений

Для Java приложений используется входной плагин типа LogstreamerInput с специальным regexp декодером. Он читает логи приложения из файлов, которые должны быть записаны в определенном формате.

Конфигурация для heka, отвечающая за чтение и декодирование логов:

[JLogs]
type = "LogstreamerInput"
log_directory = "/some/path/to/logs"
file_match = 'app_(?P<Seq>d+.d+).log'
decoder = "JDecoder"
priority = ["Seq"]

[JDecoder]
type = "PayloadRegexDecoder"
#Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started
match_regex = '^(?P<LoggerName>[w.]+)[(?P<Severity>[A-Z]+)|(?P<Thread>[wd-]+)|(?P<Timestamp>[d-s:]+)]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"

[JDecoder.severity_map]
SEVERE = 3
WARNING = 4
INFO = 6
CONFIG = 6
FINE = 6
FINER = 7
FINEST = 7

[JDecoder.message_fields]
Type = "myserver.log"
Message = "%Message%"
Logger = "%LoggerName%"
Thread = "%Thread%"
Payload = ""

В приложении надо изменить Formatter через logging.properties. Пример logging.properties:

handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler

java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern = logs/app_%g.%u.log
java.util.logging.FileHandler.limit = 1024000
java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter
java.util.logging.FileHandler.append=tru

java.util.logging.ConsoleHandler.level=ALL
java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter

Код BriefLogFormatter:

package com.asdf;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;

public class BriefLogFormatter extends Formatter {
    private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String lineSep = System.getProperty("line.separator");
    /**
     * A Custom format implementation that is designed for brevity.
     */
    public String format(LogRecord record) {
        String loggerName = record.getLoggerName();
        if(loggerName == null) {
            loggerName = "root";
        }
        StringBuilder output = new StringBuilder()
            .append(loggerName)
            .append("[")
            .append(record.getLevel()).append('|')
            .append(Thread.currentThread().getName()).append('|')
            .append(format.format(new Date(record.getMillis())))
            .append("]: ")
            .append(record.getMessage()).append(' ')
            .append(lineSep);
        return output.toString();
    }
}

Логирование скриптов (bash)

Для bash в heka добавляется входной фильтр TcpInput (который слушает на определенном порту) и PayloadRegexDecoder для декодирования сообщений. Конфигурация в hekad.toml:

[TcpInput]
address = "127.0.0.1:5566"
parser_type = "regexp"
decoder = "TcpPayloadDecoder"

[TcpPayloadDecoder]
type = "PayloadRegexDecoder"
#Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6
match_regex = '^(?P<LoggerName>[w.-]+)[(?P<Hostname>[^|]+)|(?P<Severity>[A-Z]+)|(?P<Timestamp>[d-s:]+)]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"

[TcpPayloadDecoder.severity_map]
ERROR = 3
WARNING = 4
INFO = 6
ALERT = 1

[TcpPayloadDecoder.message_fields]
Type = "scripts"
Message = "%Message%"
Logger = "%LoggerName%"
Hostname = "%Hostname%"
Payload = "[%Hostname%|%LoggerName%] %Message%"

Для логирования написана функция, которая отправляет сообщения на TCP порт в заданном формате:

log()
{
    if [ "$1" ]; then
            echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 127.0.0.1 5566 || true
            echo $1
    fi
}

Отправка сообщения с уровнем INFO с типом app1:

log "test test test"

Отправка записей в elasticsearch

Добавляется следующая конфигурация в hekad.conf:

[ESJsonEncoder]
index = "heka-%{Type}-%{2006.01.02}"
es_index_from_timestamp = true
type_name = "%{Type}"

[ElasticSearchOutput]
message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'"
server = "http://<elasticsearch_ip>:9200"
flush_interval = 5000
flush_count = 10
encoder = "ESJsonEncoder"

Здесь мы указываем, где находится elasticsearch, как должны формироваться индексы и какие типы сообщений туда отправлять.

Просмотр логов

Для просмотра логов используется Kibana 4. Она все еще в бете, но уже вполне рабочая. Для установки надо скачать архив со страницы http://www.elasticsearch.org/overview/kibana/installation/, распаковать в какую либо папку на сервере и указать расположение elasticsearch сервера в файле config/kibana.yml (параметр elasticsearch_url).

При первом запуске понадобится добавить индексы во вкладке Settings. Чтобы можно было добавить шаблон индекса и правильно определились поля, необходимо отправить тестовое сообщение из каждого приложения. Потом можно добавить шаблон индекса вида heka-*(которое покажет все сообщения) или heka-scripts-*, тем самым отделив приложения друг от друга. Дальше можно перейти вкладку Discover и посмотреть сами записи.

Заключение

Я показал только часть того, что можно логировать с помощью Heka.
Если кто-либо заинтересовался, то могу показать часть Ansible конфигурации, которая автоматически устанавливает heka на все серверы, а на выбранные elasticsearch с kibana.

Автор: ildus

Источник

Поделиться

* - обязательные к заполнению поля