- PVSM.RU - https://www.pvsm.ru -
Приветствую.
Так уж сложилось, что по долгу работы мне приходится много времени уделять логам. Это и участие в выработке правил и политик сбора/хранения/использования логов, это и разбор разных инцидентов и обнаружение аномалий. За сутки наши программы, сервисы и серверы генерируют ОЧЕНЬ большое количество логов. И потребность копания в логах растёт постоянно.
Мне довелось поработать с коммерческими лог-менеджмент продуктами типа ArcSight, RSA Envision, Q1 Labs. У этих продуктов есть как плюсы, так и минусы. Но в статье речь пойдёт не о них.
Речь будет о Logstash [1].
Что же такое Logstash? Зачем он нужен? Что он умеет?
Logstash — это орудие для сбора, фильтрации и нормализации логов. Оно является бесплатным и open source приложением. Тип лицензии Apache 2.0.
Первой моё знакомство с LS (Logstash) произошло более года назад, и с того времени я очень плотно на него подсел. Мне нравится его идея и возможности. Для меня Logstash — это подобие мясорубки. Неважно что заходит в неё, но после не сложных манипуляций, на выходе всегда красиво и понятно оформленная информация.
Формат конфигурационного файла Logstash'а прост и понятен. Он состоит из трёх частей:
input {
...
}
filter {
...
}
output {
...
}
Входных, фильтрующих и выходных (или выходящих?) блоков может быть любое количество. Всё зависит от ваших потребностей и возможностей железа.
Пустые строки и строки начинающиеся с # — Logstash игнорирует. Так что комментирование конфигурационных файлов не вызовет никаких проблем.
Данный метод является входной точкой для логов. В нём определяет по каким каналам будут логи попадать в Logstash.
В этой статье я попытаюсь вас ознакомит с основными типами, которые я использую — это file, tcp и udp.
input {
file {
type => "some_access_log"
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
exclude => [ "*.gz", "*.zip", "*.rar" ]
start_position => "end"
stat_interval => 1
discover_interval => 30
}
}
Построчное описание настроек:
type => "some_access_log"
тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (/path/to/logs/), а не относительным (../../some/other/path/).
exclude => [ "*.gz", "*.zip", "*.rar" ]
исключает из обработки файлы с соответствующими расширениями.
start_position => "end"
ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1
как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30
время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.
input {
tcp {
type => "webserver_prod"
data_timeout => 10
mode => "server"
host => "192.168.3.12"
port => 3337
}
}
Построчное описание настроек:
type => "webserver_prod"
тип/описание лога.
data_timeout => 10
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
mode => "server"
host => "192.168.3.12"
port => 3337
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
input {
udp {
type => "webserver_prod"
buffer_size => 4096
host => "192.168.3.12"
port => 3337
}
}
В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.
На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.
filter {
grok {
type => "some_access_log"
patterns_dir => "/path/to/patterns/"
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
}
}
Построчное описание настроек:
type => "apache_access"
тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.
patterns_dir => "/path/to/patterns/"
путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
NUMBER d+
WORD bw+b
USERID [a-zA-Z0-9_-]+
Можно использовать любой ранее созданный шаблон:
USER %{USERID}
Шаблоны можно так же и комбинировать:
CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
Допустим формат логов у нас следующий:
55.3.244.1 GET /index.html 15824 0.043
Среди готовых шаблонов, к счастью уже имеются некоторые регулярные выражения и не надо придумывать колёсное транспортное средство, приводимое в движение мускульной силой человека через ножные педали или через ручные рычаги (это я про велосипед если что).
С данным примером лога достаточно pattern записать в виде "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}", в этом случае все логи с данным форматом будут уже иметь некую логическую структуру.
После обработки наша строка будет выглядеть следующим образом:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
Со списком готовых grok-шаблонов можно познакомиться здесь [2].
filter {
mutate {
type => "apache_access"
remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]
gsub => [ "message", "\/", "_" ]
add_field => [ "sample1", "from %{clientip}" ]
}
}
Построчное описание настроек:
type => "apache_access"
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
remove => [ "client" ]
удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.
rename => [ "HOSTORIP", "client_ip" ]
переименование название поля HOSTORIP в client_ip.
gsub => [ "message", "\/", "_" ]
замена всех "/" на "_" в поле messages.
add_field => [ "sample1", "from %{clientip}" ]
добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.
filter {
date {
type => "apache_access"
match => [ "timestamp", "MMM dd HH:mm:ss" ]
}
}
Построчное описание настроек:
type => "apache_access"
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
match => [ "timestamp", "MMM dd HH:mm:ss" ]
временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]
filter {
kv {
type => "custom_log"
value_split => "=:"
fields => ["reminder"]
field_split => "t?&"
}
}
Построчное описание настроек:
type => "custom_log"
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
value_split => "=:"
использовать символы "=" и ":" для разделения ключа-значения.
fields => ["reminder"]
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
field_split => "t?&"
использовать символы "t?&" для разделения ключей. t — знак табулятора
filter {
multiline {
type => "java_log"
pattern => "^s"
what => "previous"
}
}
Построчное описание настроек:
type => "java_log"
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
pattern => "^s"
регулярное выражение
what => "previous"
при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
output {
stdout {
type => "custom_log"
message => "IP - %{clientip}. Full message: %{@message}. End of line."
}
}
type => "custom_log"
тип/описание лога.
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
output {
file {
type => "custom_log"
flush_interval => 5
gzip=> true
path => "/var/log/custom/%{clientip}/%{type}"
message_format => "ip: %{clientip} request:%{requri}"
}
}
type => "custom_log"
тип/описание лога.
flush_interval => 5
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.
gzip=> true
файл исходящих сообщений будет сжат Gzip.
path => "/var/log/custom/%{clientip}/%{type}"
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
message_format => "ip: %{clientip} request:%{requri}"
формат исходящего сообщения.
output {
elasticsearch {
type => "custom_log"
cluster => "es_logs"
embedded => false
host => "192.168.1.1"
port => "19300"
index => "logs-%{+YYYY.MM.dd}"
}
}
type => "custom_log"
тип/описание лога.
cluster => "es_logs"
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
embedded => false
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
port => "19300"
транспортный port Elasticsearch.
host => "192.168.1.1"
IP адрес Elasticsearch
index => "logs-%{+YYYY.MM.dd}"
название индекса куда будут записываться логи.
output {
email {
type => "custom_log"
from => "logstash@domain.com"
to => "admin1@domain.com"
cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}"
body => "Here is the event line %{@message}"
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
via => "sendmail"
options => [ "smtpIporHost", "smtp.gmail.com",
"port", "587",
"domain", "yourDomain",
"userName", "yourSMTPUsername",
"password", "PASS",
"starttls", "true",
"authenticationType", "plain",
"debug", "true"
]
match => [ "response errors", "response,501,,or,response,301",
"multiple response errors", "response,501,,and,response,301" ]
}
}
type => "custom_log"
тип/описание лога.
from => "logstash@domain.com"
to => "admin1@domain.com"
cc => "admin2@domain.com"
если у вас хватило сил дочитать до этих строк, значит вы можете сами определить смысл этих 3х настроек :)
subject => "Found '%{matchName}' Alert on %{@source_host}"
тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
body => "Here is the event line %{@message}"
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
тело письма.
via => "sendmail"
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
options => ...
стандартные настройки почтовых параметров.
match => [ "response errors", "response,501,,or,response,301",
"multiple response errors", "response,501,,and,response,301" ]
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
Создаём файл habr.conf:
input {
tcp {
type => "habr"
port => "11111"
}
}
filter {
mutate {
type => "habr"
add_field => [ "habra_field", "Hello Habr" ]
}
}
output {
stdout {
type => "habr"
message => "%{habra_field}: %{@message}"
}
}
Запускаем Logstash:
java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
Проверяем, что Logstash запущен:
# netstat -nat |grep 11111
Если порт 11111 присутствует, значит Logstash готов принимать логи.
В новом терминальном окне пишем:
echo "Logs are cool!" | nc localhost 11111
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.
P.s.Последнюю версию Logstash можно скачать отсюда [3].
Спасибо за внимание,
Автор: timukas
Источник [4]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/sistemnoe-administrirovanie/25336
Ссылки в тексте:
[1] Logstash: http://www.logstash.net/
[2] здесь: https://github.com/logstash/logstash/tree/v1.1.9/patterns
[3] отсюда: http://logstash.net
[4] Источник: http://habrahabr.ru/post/165059/
Нажмите здесь для печати.