Централизованый сбор Windows event логов, без установки агента, с последующей визуазизацией средствами ELK

в 7:45, , рубрики: elasticsearch, kibana, logs, logstash, powershell, системное администрирование

Задача по централизованой обработке логов достаточно просто формулируется и возникает, когда требуется отслеживать работу большого количества серверов. Думаю, не стоит упоминать о том, что из логов можно получать массу информации о жизнедеятельности и самочувствии систем. О том, что писать и читать логи столь же важно как и уметь писать программы.

Соответственно для реализации такой системы перед администратором ставятся задачи: во-первых, каким образом эти логи собирать, во-вторых, каким образом с ними удобно и централизованно работать. Благодаря достаточно развитой связке ELK (Elasticsearch + Logstash + Kibana), уже не раз описанной на Хабре, у администратора имеются инструменты для удобного поиска и отображения всей присутствующей в логах информации. Поэтому ответ на вторую задачу имеется изначально, и остается лишь решить задачу по сбору логов.

Так как в моем случае требованием к системе было отсутствие клиента на серверах, и то, что логи требовалось вытаскивать с Windows-серверов, то в качестве инструмента сбора был выбран родной для Windows — powershell.
Исходя из этого была составлена следующая модель сбора и отображения информации из логов: логи удаленно собираются с серверов powershell-скриптом, после чего складываются в виде файлов на хранилище, далее средствами ELK (Elasticsearch + Logstash + Kibana) происходит их обработка и отображение.

Пример работы всей связки представлен на изображении:

Централизованый сбор Windows event логов, без установки агента, с последующей визуазизацией средствами ELK - 1

Упреждая критику, опишу, что данная система не ставит задачи сбора логов в реалтайме, целью является лишь статистика, которая собирается за определенный промежуток времени, а затем отображается на dashboard-е. Применяется она с целью прийти с утра, посмотреть как себя вели сервера ночью, и сравнить с результатами, полученными, допустим, на прошлой неделе. В предлагаемой системе логи собираются раз в час, соответственно, отставание между текущими логами и тем, что выводятся (если быть точным, отображаются и выводятся по запросу) на dashboard-е может составлять порядка часа.

В настоящий момент в powershell имеется два способа получения логов с удаленного компьютера:

  • родная команда powershell: Get-EventLog -ComputerName $computer –LogName System
  • получение логов через WMI-запрос: Get-WmiObject -Class win32_NTLogEvent -filter «logfile = 'System'» -ComputerName $computer

Но в первом случае происходит полное перекачивание файла с event-логами системы и лишь затем осуществляется их обработка на компьютере, где выполняется скрипт. Как следствие, такие запросы обрабатываются неоправданно долго. Возможность выборки логов (например, лишь за последние сутки) здесь не особо отрабатывает, так как изначально вытаскивается весь файл и лишь затем с ним ведется какая-то работа.

Во втором случае серверу отсылается WMI запрос, обработка происходит на стороне сервера и ключевой момент здесь в том, что появляется возможность ограничить промежуток интересующих нас логов уже на этапе запроса (в приведенном ниже примере промежуток выставлен в 1 час). Так как данная комманда отрабатывает намного быстрее первой, а время выполнения запроса напрямую зависит от запрашиваемого промежутка логов, то выбар пал на Get-WmiObject.

В представленном ниже скрипте имеются несколько неочевидных и сложных моментов:
Сначала описана логика по ограничению промежутка времени для выборки логов, когда требуются логи за последний час, но за час не с момента запроса, а за последний полный час, т.е. начиная с 00 мин. и заканчивая 59 мин.
Второй момент — то, что время в формате WMI отличается от привычного формата, поэтому постоянно требуется конвертация в WMI формат времени и обратно.

ServersEventLogs.ps1
Clear-Host

# импортируем список серверов из Active Directory (для этого в powershell должен быть дополнительно установлен модуль для Active Directory)
import-module activedirectory
$computers = Get-ADComputer -SearchBase "OU=Servers,DC=domain,DC=ru" -Filter * | ForEach-Object {$_.Name} | Sort-Object

# определяем директорию для логирования 
$logdir = "\storageLogsServersLog" + $(Get-Date -UFormat "%Y_%m")
# если директория отсутствует, то создаем 
if((Test-Path $logdir) -eq 0) {
	New-Item -ItemType directory $logdir -Force
}

# указываем данные пользователя под которым будут выполнятся команды
$domain = "domain"
$username = "username" 
$password = 'password'

$account = "$domain"+""+$($username)
$accountpwd = ConvertTo-SecureString $password -AsPlainText -Force
$credential = New-Object System.Management.Automation.PsCredential($account, $accountpwd)

# для того, чтобы делать выгрузку за предыдущий час, нужно ограничить время за которое лог был сформирован следующим образом: верхний предел - минус час, нижний предел - начало текущего часа.
# получается примерно следующее:
# BiginDate = 08/26/2014 12:00:00
# EndDate = 08/26/2014 13:00:00
# в результате будет выгружен лог созданый в пределах от BiginDate = 08/26/2014 12:00:00 до EndDate = 08/26/2014 13:00:00

$date = Get-Date
Write-Host "Date = $date"
$m = $date.Minute
$s = $date.Second
$begindate = (($date.AddSeconds(-$s)).AddMinutes(-$m)).addHours(-1)
Write-Host "BiginDate = $begindate"
$enddate = ($date.AddSeconds(-$s)).AddMinutes(-$m)
Write-Host "EndDate = $enddate"

# перевод времени в формат WMI
$wmibegindate=[System.Management.ManagementDateTimeConverter]::ToDMTFDateTime($begindate)
Write-Host "WMIBiginDate = $wmibegindate"
$wmienddate=[System.Management.ManagementDateTimeConverter]::ToDMTFDateTime($enddate)
Write-Host "WMIEndDate = $wmienddate"

$logjournals = "System", "Application", "Security"

foreach ($computer in $computers) {
	Write-Host "Processing computer: $computer"
	foreach ($logjournal in $logjournals) {
		Write-Host "Processing log: $logjournal"
		$systemlog = Get-WmiObject -Class win32_NTLogEvent -filter "logfile = '$logjournal' AND (TimeWritten>='$wmibegindate') AND (TimeWritten<'$wmienddate')" -computerName $computer -Credential $credential -ErrorAction SilentlyContinue
		
        foreach ($logstring in $systemlog) {
			$wmitime = $logstring.TimeGenerated
			$time = [System.Management.ManagementDateTimeconverter]::ToDateTime("$wmitime")
			#Write-Host $logtime
				
			$level = $logstring.Type
			#Write-Host "$level"
				
			$journal = $logstring.LogFile
			#Write-Host "$journal"
			
			$category = $logstring.CategoryString
			#Write-Host "$category"
			
			$source = $logstring.SourceName
			#Write-Host "$source"
			
			$message = $logstring.Message
			#Write-Host "$message"
			
			$code = $logstring.EventCode
			#Write-Host "$code"
			
            @{Server="$computer";Time="$time";Level="$level";Journal="$journal";Category="$category";Source="$source";Message="$message";Code="$code"} | ConvertTo-Json -depth 10 -Compress | Out-File "$logdir$computer-$logjournal.json" -Encoding utf8 -Append
		}
	}
}

По завершении действия скрипта на выходе получаются файлы вида: ComputerName-JournalName.json.
Формат json несколько не соответствует стандарту (отсутствуют открывающие и закрывающие скобки), но парсер Logstash его нормально переваривает и обрабатывает. Для каждого из серверов создается по три файла: ComputerName-System.json ComputerName-Application.json ComputerName-Security.json Так как файлы имеют одинаковый формат, их обработка идеентична.

Ограничить сбор логов определенным журналом можно простым редактированием строчки: $logjournals = «System», «Application», «Security»

Далее в дело вступает Logstash со следующей конфигурацией:

ServersEventLogs.conf
input {

  file {
    type => "ServersLogs"
    discover_interval => 1800
    path => [ "//storage/Logs/ServersLog/*/*.json" ]
    codec => "json"
  }

}


filter {

  date {
    type => "ServersLogs"
    match => [ "Time", "MM/dd/YYYY HH:mm:ss" ]
    locale => "en"
    target => "Logtimestamp"
  }

  mutate {
    gsub => [ "Level", "[ -]", "_" ]
    gsub => [ "Source", "[ -]", "_" ]
    gsub => [ "Server", "[ -]", "_" ]
    remove_field => ["message"]
    remove_field => ["host"] 
  }

}


output {

  elasticsearch {
    embedded => false
    host => "logserver"
    protocol => "http"
    cluster => "windowseventlogs"
    codec => "plain"
    index => "windowseventlogs-%{+YYYY.MM.dd}"
  }

}

Данные заносятся в Elasticsearch, откуда в дальнейшем отображаются с помощью Kibana.

В результате на экран выводится информация (в моем случае за последние 2 суток): о самых проблемных серверах, о самых проблемных сервисах; рисуется график, по которому сразу можно увидеть увеличение количества логов или ошибок в определенный момент времени. Всегда можно выполнить поиск по тексту ошибки или имени пользователя, либо произвести сортировку по уровню или id ошибки.

Автор: sdm

Источник

Поделиться новостью

* - обязательные к заполнению поля