Применение R для утилитарных задач

в 4:42, , рубрики: data mining, data science, R

Хороший инструмент + наличие навыков работы с ним, что достигается путем практики, позволяет легко и элегантно решать множество различных «как бы» нетипичных задач. Ниже пара подобных примеров. Уверен, что многие могут этот список расширить.

Является продолжением предыдущих публикаций.

Аналитика по логам приложений

Достаточно популярной является задача проведения аналитических расчетов на основани логов приложений. Например, провести аналитику действий пользователей и прикинуть прогнозные показатели, либо проверить гипотезы. Можно пойти по классическому варианту и поднять ELK стек или ему подобные (с недавних пор Splunk выбыл из доступных в России систем). Но можно немного подумать и быстро сделать все на R. Быстро во всех смыслах, как в реализации, так по времени процессинга.

Но бывает ряд особенностей при решении подобной задачи:

  1. Обычно лог файлы пишутся в классическом log4j формате: временная метка, важность, тип подсистемы, тело сообщения.
  2. Временная метка может содержать события с миллисекундным разрешением, которое обязательно нужно сохранять для правильности последующей аналитики. Миллисекунды могут писать не сообразуясь с ISO 8601.
  3. Тело сообщения является практически неструктурированной сущностью. Разработчики пишут туда все что считают нужным, не ограничивая себя никакими форматами представления.
  4. Иногда тело сообщения является многострочным, например, вывод стека вызовов java, или xml пакет межсистемного обмена.
  5. Ряд атриутов может быть внешним по отношению к содержанию и доставать их надо иным образом, например, id объекта может быть закодирован в имени лог-файла.
  6. Логов в виде файлов может быть как несколько мегабайт, так и сотни гигабайт.
  7. Задача очень хорошо параллелится.

Фактически, задача может быть разделена на 2 шага:

  • препроцессинг сырых данных;
  • последующая аналитика.

Содержание последнего шага определяется предметной областью и бизнес-задачами. Первый же шаг достаточно легко может быть решен на R. При этом, в зависимости от объема лог файлов, частично структурированный результат препроцессинга можно складывать как в файлы, так и в БД.

Просто пример кода:

library(readr)
library(tidyverse)
library(magrittr)
library(stringi)
library(fs)
library(glue)
library(RClickhouse)
library(DBI)
library(anytime)
library(tictoc)
library(iterators)
library(foreach)
library(doParallel)
library(futile.logger)
library(re2r)
library(data.table)
library(future)
library(doFuture)

common_logname <- "DEV_log_parser.log"
table_name <- "DEV_LOGS"

flog.appender(appender.file(common_logname))
flog.threshold(INFO)
flog.info("Start batch processing")

oneTimeProcessing <- function(f_iter, log_type = c("app", "system")) {
  log_type <- match.arg(log_type)
  checkmate::assertNames(names(f_iter), 
                         permutation.of = c("fname", "short_fname", "location", "wk", "size", "id"))
  cfg <- list(app = list(db_table = "DEV_APP_LOGS"),
              system = list(db_table = "DEV_LOGS"))

  # читаем файл  
  data <- readr::read_lines(file = f_iter$fname, progress = FALSE)

  log_df <- setDT(tibble::enframe(data, name = NULL)) %>%
    .[, log_line_start := re2r::re2_detect(value, 
                                           pattern = "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}",
                                           parallel = F)] %>%
    .[, log_line_number := cumsum(log_line_start)] %>%
    .[, body := stri_c(value, collapse = "n"), by = log_line_number] %>%
    .[, `:=`(value = NULL, log_line_start = NULL, log_line_number = NULL)] %>%
    tibble::as_tibble() %>%
    # даже body = character(0) будет разложен на колонки с 0 строк
    # миллисекунды мы сразу засунем в POSIXct 
    tidyr::extract(col = "body",
                   into = c("timestamp", "tz", "level", "module", "class", "message"),
                   # tz может быть (системные логи DEV), а может и не быть (приложения DEV)
                   regex = "^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}:\d+([+-]\d+)?) (.*?) <(.*?)> \[(.*?)\] (?s:(.*))$",
                   case_insensitive = TRUE, ignore.case = TRUE) %>%
    # дату надо подогнать в ISO стандарт и переведем временную зону сразу на Москву (ведут ведь по ней?)
    # для ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601)
    mutate_at("timestamp", re2r::re2_replace, 
              # tz может быть (системные логи DEV), а может и не быть (приложения DEV)
              pattern = "(.*) (\d{2}:\d{2}:\d{2}):(\d+([+-]\d+)?)", 
              replacement = "\1T\2.\3") %>%
    mutate_at("timestamp", lubridate::as_datetime, tz = "Europe/Moscow") %>%
    # добавим предметное описание
    mutate(location = f_iter$location, wk = f_iter$wk)

  # TRUNCATE в CH организован не так давно, поэтому в общем случае приходится удалять и создавать таблицу вручную
  # осуществляем запись в CH, ms можно получить как (timestamp %% 1)
  conn <- DBI::dbConnect(RClickhouse::clickhouse(), host = "10.0.0.1", db = "DEV_LOGS")
  # m <- DBI::dbExecute(conn, glue("ALTER TABLE {table_name}"))
  write_res <- log_df %>%
    mutate(ms = (as.numeric(timestamp) %% 1) * 1000) %>%
    select(location, wk, timestamp, ms, level, module, class, message) %>%
    # база к которой идет подключение должна быть определена в самом коннекте
    DBI::dbWriteTable(conn, cfg[[log_type]][["db_table"]], ., append = TRUE)
  DBI::dbDisconnect(conn)

  # сформируем на возврат статистику по файлу
  res <- tibble::tibble(id = f_iter$id,
                        lines = nrow(log_df), 
                        min_t = min(log_df$timestamp), 
                        max_t = max(log_df$timestamp),
                        write_res)
  rm(data, log_df)
  return(res)
}

# Сам цикл загрузки
tic("Batch processing")

# инициализируем параллельную обработку
gc(full = TRUE)
nworkers <- parallel::detectCores() - 1

registerDoFuture()
# future::plan(multiprocess)
# future::plan(multisession)
future::plan(multisession, workers = nworkers)
# future::plan(sequential) # так ~ секунд

# осуществляем парсинг и загрузку в CH
# логи приложений ------------------
fnames_tbl <- here::here("raw_data") %>%
  fs::dir_ls(recurse = TRUE, glob = "*dev_app*.gz") %>% 
  enframe(name = "fname") %>% 
  # сразу выдернем короткое имя для представления в логах
  mutate(short_fname = as.character(fs::path_rel(fname, start = "./raw_data"))) %>% 
  select(-value) %>%
  mutate(size = fs::file_size(fname)) %>%
  tidyr::extract(col = "short_fname", into = c("location", "wk"),
                 regex = "^([^/]+)/wk(\d+)", remove = FALSE) %>%
  arrange(size) %>%
  mutate(id = paste(format(row_number(), justify = "r", width = 4), "/", n())) %>%
  # поделим на ~ N обработчиков
  mutate(chunk = (row_number() %% nworkers + 1)) %>%
  # теперь сортируем по чанкам, чтобы dopar правильно все разделил
  arrange(chunk)

start_time <- Sys.time()
stat_list <- 
  foreach(it = iter(fnames_tbl, by = "row"), .export = c("start_time"), 
          .verbose = TRUE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
            # инициализируем логгер
            flog.appender(appender.file(common_logname))
            # flog.info(capture.output(gc(verbose = TRUE)))
            res <- oneTimeProcessing(it, log_type = "app")
            flog.info(glue("Step {it$id} finished.",
                           "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " "))
            return(res)
          }
flog.info("Load finished")

# терминируем параллельную обработку --------------
# закрываем все дочерние сессии, они едят память
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))

# смотрим статистику по файлам -------------
logstat_tbl <- stat_list %>%
  dplyr::bind_rows() %>%
  # подклеиваем исходные атрибуты
  left_join(fnames_tbl, by = "id") %>%
  # дельта по времени между записями в логе в минутах
  mutate(delta_t = as.numeric(difftime(max_t, min_t, units = "mins"))) %>%
  arrange(min_t)

write_delim(logstat_tbl, here::here("output", "DEV_parse_stat.csv.gz"), delim = ";")

# проверим, а все ли результаты успешны?
if(nrow(logstat_tbl) < nrow(fnames_tbl)){
  flog.error("!!!!!!! Not all workers were executed successfully !!!!!!!!!")
}

Этот пример кода содержит в себе основные концепции, такие как параллелизация, обработку времени с учетом миллисекунд, сохранение в БД, учет многострочных записей, сводку по результату работы, использование внешних атрибутов, предварительный бенчмаркинг и выбор оптимальных функций и пакетов (re2r, например). Но он не претендует на идеальность, поскольку это всего-лишь однократное действие по препроцессингу данных. Делает быстро и корректно, ну и ок. Для другой похожей задачи подправим с учетом соотв. входных данных.

Будет ли это разительно быстрее с точки зрения времени получения готового результата на других языках? Вопрос открытый. Параллельные версии с python, perl, awk не показали разительных отличий. Возможно, что гуру в python достигнет лучших показателей, но не забываем, что это всего-лишь проходная периодическая «однократная» задача.

Наведение порядка в фотографиях

После поездки с несколькими устройствами на руках приходится собирать все фото воедино и как-то их упорядочивать перед последующей обработкой. Одним из оптимальных вариантов является именование файлов по дате съемки (YYYY-MM-DD hh_mm_ss), тем самым будет обеспечена упорядоченность фото по стреле времени. Exif атрибуты помогают решить эту задачу в один шаг.

И это также можно выполнить с помощью R в «пару строк». Пакеты fs и exifr в помощь.

  • составил список файлов;
  • вытащил атрибуты;
  • скопировал файлы с переименованием в соотв. с нужными атрибутами.

Собственно говоря, задача свелась к предыдущей, только атрибуты собираются не по имени файла, а по его exif атрибутам, а в процессинге идет просто копирование файла с переименованием. Скелет скрипта и логика работы остаются без изменений.

Почему exifr? Потому что он является оберткой к мощной кроссплатформенной утилите ExifTool.

Завершение

Подобных задач множество, многие из них могут решаться с помощью R тоже.

Предыдущая публикация — «Дети, математика и R».

Автор: Илья Шутов

Источник


* - обязательные к заполнению поля