- PVSM.RU - https://www.pvsm.ru -
Хороший инструмент + наличие навыков работы с ним, что достигается путем практики, позволяет легко и элегантно решать множество различных «как бы» нетипичных задач. Ниже пара подобных примеров. Уверен, что многие могут этот список расширить.
Является продолжением предыдущих публикаций [1].
Достаточно популярной является задача проведения аналитических расчетов на основани логов приложений. Например, провести аналитику действий пользователей и прикинуть прогнозные показатели, либо проверить гипотезы. Можно пойти по классическому варианту и поднять ELK стек или ему подобные (с недавних пор Splunk выбыл из доступных в России систем). Но можно немного подумать и быстро сделать все на R. Быстро во всех смыслах, как в реализации, так по времени процессинга.
Но бывает ряд особенностей при решении подобной задачи:
log4j
формате: временная метка, важность, тип подсистемы, тело сообщения.Фактически, задача может быть разделена на 2 шага:
Содержание последнего шага определяется предметной областью и бизнес-задачами. Первый же шаг достаточно легко может быть решен на R. При этом, в зависимости от объема лог файлов, частично структурированный результат препроцессинга можно складывать как в файлы, так и в БД.
library(readr)
library(tidyverse)
library(magrittr)
library(stringi)
library(fs)
library(glue)
library(RClickhouse)
library(DBI)
library(anytime)
library(tictoc)
library(iterators)
library(foreach)
library(doParallel)
library(futile.logger)
library(re2r)
library(data.table)
library(future)
library(doFuture)
common_logname <- "DEV_log_parser.log"
table_name <- "DEV_LOGS"
flog.appender(appender.file(common_logname))
flog.threshold(INFO)
flog.info("Start batch processing")
oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) {
log_type <- match.arg(log_type)
checkmate::assertNames(names(f_iter),
permutation.of = c("fname", "short_fname", "location", "wk", "size", "id"))
cfg <- list(app = list(db_table = "DEV_APP_LOGS"),
system = list(db_table = "DEV_LOGS"))
# читаем файл
data <- readr::read_lines(file = f_iter$fname, progress = FALSE)
log_df <- setDT(tibble::enframe(data, name = NULL)) %>%
.[, log_line_start := re2r::re2_detect(value,
pattern = "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}",
parallel = F)] %>%
.[, log_line_number := cumsum(log_line_start)] %>%
.[, body := stri_c(value, collapse = "n"), by = log_line_number] %>%
.[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>%
tibble::as_tibble() %>%
# даже body = character(0) будет разложен на колонки с 0 строк
# миллисекунды мы сразу засунем в POSIXct
tidyr::extract(col = "body",
into = c("timestamp", "tz", "level", "module", "class", "message"),
# tz может быть (системные логи DEV), а может и не быть (приложения DEV)
regex = "^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}:\d+([+-]\d+)?) (.*?) <(.*?)> \[(.*?)\] (?s:(.*))$",
case_insensitive = TRUE, ignore.case = TRUE) %>%
# дату надо подогнать в ISO стандарт и переведем временную зону сразу на Москву (ведут ведь по ней?)
# для ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601)
mutate_at("timestamp", re2r::re2_replace,
# tz может быть (системные логи DEV), а может и не быть (приложения DEV)
pattern = "(.*) (\d{2}:\d{2}:\d{2}):(\d+([+-]\d+)?)",
replacement = "\1T\2.\3") %>%
mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>%
# добавим предметное описание
mutate(location = f_iter$location, wk = f_iter$wk)
# TRUNCATE в CH организован не так давно, поэтому в общем случае приходится удалять и создавать таблицу вручную
# осуществляем запись в CH, ms можно получить как (timestamp %% 1)
conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS")
# m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}"))
write_res <- log_df %>%
mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>%
select(location, wk, timestamp, ms, level, module, class, message) %>%
# база к которой идет подключение должна быть определена в самом коннекте
DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE)
DBI::dbDisconnect(conn)
# сформируем на возврат статистику по файлу
res <- tibble::tibble(id = f_iter$id,
lines = nrow(log_df),
min_t = min(log_df$timestamp),
max_t = max(log_df$timestamp),
write_res)
rm(data, log_df)
return(res)
}
# Сам цикл загрузки
tic("Batch processing")
# инициализируем параллельную обработку
gc(full = TRUE)
nworkers <- parallel::detectCores() - 1
registerDoFuture()
# future::plan(multiprocess)
# future::plan(multisession)
future::plan(multisession, workers = nworkers)
# future::plan(sequential) # так ~ секунд
# осуществляем парсинг и загрузку в CH
# логи приложений ------------------
fnames_tbl <- here::here("raw_data") %>%
fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>%
enframe(name = "fname") %>%
# сразу выдернем короткое имя для представления в логах
mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>%
select(-value) %>%
mutate(size = fs::file_size(fname)) %>%
tidyr::extract(col = "short_fname", into = c("location", "wk"),
regex = "^([^/]+)/wk(\d+)", remove = FALSE) %>%
arrange(size) %>%
mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>%
# поделим на ~ N обработчиков
mutate(chunk = (row_number() %% nworkers + 1)) %>%
# теперь сортируем по чанкам, чтобы dopar правильно все разделил
arrange(chunk)
start_time <- Sys.time()
stat_list <-
foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"),
.verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
# инициализируем логгер
flog.appender(appender.file(common_logname))
# flog.info(capture.output(gc(verbose = TRUE)))
res <- oneTimeProcessing(it, log_type = "app")
flog.info(glue("Step {it$id} finished.",
"Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " "))
return(res)
}
flog.info("Load finished")
# терминируем параллельную обработку --------------
# закрываем все дочерние сессии, они едят память
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
# смотрим статистику по файлам -------------
logstat_tbl <- stat_list %>%
dplyr::bind_rows() %>%
# подклеиваем исходные атрибуты
left_join(fnames_tbl, by = "id") %>%
# дельта по времени между записями в логе в минутах
mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>%
arrange(min_t)
write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";")
# проверим, а все ли результаты успешны?
if(nrow(logstat_tbl) < nrow(fnames_tbl)){
flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!")
}
Этот пример кода содержит в себе основные концепции, такие как параллелизация, обработку времени с учетом миллисекунд, сохранение в БД, учет многострочных записей, сводку по результату работы, использование внешних атрибутов, предварительный бенчмаркинг и выбор оптимальных функций и пакетов (re2r
, например). Но он не претендует на идеальность, поскольку это всего-лишь однократное действие по препроцессингу данных. Делает быстро и корректно, ну и ок. Для другой похожей задачи подправим с учетом соотв. входных данных.
Будет ли это разительно быстрее с точки зрения времени получения готового результата на других языках? Вопрос открытый. Параллельные версии с python
, perl
, awk
не показали разительных отличий. Возможно, что гуру в python
достигнет лучших показателей, но не забываем, что это всего-лишь проходная периодическая «однократная» задача.
После поездки с несколькими устройствами на руках приходится собирать все фото воедино и как-то их упорядочивать перед последующей обработкой. Одним из оптимальных вариантов является именование файлов по дате съемки (YYYY-MM-DD hh_mm_ss
), тем самым будет обеспечена упорядоченность фото по стреле времени. Exif атрибуты помогают решить эту задачу в один шаг.
И это также можно выполнить с помощью R в «пару строк». Пакеты fs
[2] и exifr
[3] в помощь.
Собственно говоря, задача свелась к предыдущей, только атрибуты собираются не по имени файла, а по его exif атрибутам, а в процессинге идет просто копирование файла с переименованием. Скелет скрипта и логика работы остаются без изменений.
Почему exifr
? Потому что он является оберткой к мощной кроссплатформенной утилите ExifTool
[4].
Подобных задач множество, многие из них могут решаться с помощью R тоже.
Предыдущая публикация — «Дети, математика и R» [5].
Автор: Илья Шутов
Источник [6]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/data-mining/328065
Ссылки в тексте:
[1] предыдущих публикаций: https://habrahabr.ru/users/i_shutov/posts/
[2] fs
: https://fs.r-lib.org
[3] exifr
: https://github.com/paleolimbot/exifr
[4] ExifTool
: https://en.wikipedia.org/wiki/Comparison_of_digital_image_metadata_editors
[5] «Дети, математика и R»: https://habr.com/ru/post/462619/
[6] Источник: https://habr.com/ru/post/464849/?utm_source=habrahabr&utm_medium=rss&utm_campaign=464849
Нажмите здесь для печати.