R и большие данные: использование Replyr

в 12:37, , рубрики: big data, dplyr, R, replyr, spark, sparklyr, Большие данные, метки:

replyr — сокращение от REmote PLYing of big data for R (удаленная обработка больших данных в R).

Почему стоит попробовать replyr? Потому что он позволяет применять стандартные рабочие подходы к удаленным данным (базы данных или Spark).

Можно работать так же, как и с локальным data.frame. replyr предоставляет такие возможности:

  • Обобщение данных: replyr_summary().
  • Объединение таблиц: replyr_union_all().
  • Связывание таблиц по строкам: replyr_bind_rows().
  • Использование функций разделения, объединения, комбинирования (dplyr::do()): replyr_split(), replyr::gapply().
  • Аггрегирование/распределение: replyr_moveValuesToRows() / replyr_moveValuesToColumns().
  • Отслеживание промежуточных результатов.
  • Контроллер объединений.

Скорее всего, вы всё это делаете с данными локально, поэтому такие возможности сделают работу со Spark и sparklyr гораздо легче.

replyr — продукт коллективного опыта использования R в прикладных решениях для многих клиентов, сбора обратной связи и исправления недостатков.

Примеры ниже.

Все сейчас быстро меняется, поэтому воспользуемся для примеров версиями пакетов в разработке.

base::date()

## [1] "Thu Jul  6 15:56:28 2017"

# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")

## [1] '0.7.1.9000'

packageVersion("dbplyr")

## [1] '1.1.0.9000'

library("tidyr")
packageVersion("tidyr")

## [1] '0.6.3'

library("replyr")
packageVersion("replyr")

## [1] '0.4.2'

suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")

## [1] '0.5.6.9012'

# больше памяти, чем предполагается в https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0', 
                              hadoop_version = '2.7',
                              master = "local",
                              config = config)

Summary

Стандартные summary() и glance(), которые нельзя выполнить на Spark.

mtcars_spark <- copy_to(sc, mtcars)

# резюме обработки, а не данных
summary(mtcars_spark)

##     Length Class          Mode
## src 1      src_spark      list
## ops 2      op_base_remote list

packageVersion("broom")

## [1] '0.4.2'

broom::glance(mtcars_spark)

## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl

replyr_summary работает.

replyr_summary(mtcars_spark) %>%
  select(-lexmin, -lexmax, -nunique, -index)

##    column   class nrows nna    min     max       mean          sd
## 1     mpg numeric    32   0 10.400  33.900  20.090625   6.0269481
## 2     cyl numeric    32   0  4.000   8.000   6.187500   1.7859216
## 3    disp numeric    32   0 71.100 472.000 230.721875 123.9386938
## 4      hp numeric    32   0 52.000 335.000 146.687500  68.5628685
## 5    drat numeric    32   0  2.760   4.930   3.596563   0.5346787
## 6      wt numeric    32   0  1.513   5.424   3.217250   0.9784574
## 7    qsec numeric    32   0 14.500  22.900  17.848750   1.7869432
## 8      vs numeric    32   0  0.000   1.000   0.437500   0.5040161
## 9      am numeric    32   0  0.000   1.000   0.406250   0.4989909
## 10   gear numeric    32   0  3.000   5.000   3.687500   0.7378041
## 11   carb numeric    32   0  1.000   8.000   2.812500   1.6152000

Аггрегирование/распределение

tidyr работает в основном с локальными данными.

mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars)) %>%
  copy_to(sc, ., 'mtcars2')

# ошибки
mtcars2 %>% 
  tidyr::gather('fact', 'value')

## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

mtcars2 %>%
  replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', 
                          nameForNewValueColumn= 'value', 
                          columnsToTakeFrom= colnames(mtcars),
                          nameForNewClassColumn= 'class') %>%
  arrange(car, fact)

## # Source:     lazy query [?? x 4]
## # Database:   spark_connection
## # Ordered by: car, fact
##            car  fact  value   class
##                
##  1 AMC Javelin    am   0.00 numeric
##  2 AMC Javelin  carb   2.00 numeric
##  3 AMC Javelin   cyl   8.00 numeric
##  4 AMC Javelin  disp 304.00 numeric
##  5 AMC Javelin  drat   3.15 numeric
##  6 AMC Javelin  gear   3.00 numeric
##  7 AMC Javelin    hp 150.00 numeric
##  8 AMC Javelin   mpg  15.20 numeric
##  9 AMC Javelin  qsec  17.30 numeric
## 10 AMC Javelin    vs   0.00 numeric
## # ... with 342 more rows

Связывание по строкам

dplyr bind_rows, union и union_all сейчас неприменимы в Spark. replyr::replyr_union_all() и replyr::replyr_bind_rows() — работоспособная альтернатива.

bind_rows()

db1 <- copy_to(sc, 
               data.frame(x=1:2, y=c('a','b'), 
                          stringsAsFactors=FALSE),
               name='db1')
db2 <- copy_to(sc, 
               data.frame(y=c('c','d'), x=3:4, 
                          stringsAsFactors=FALSE),
               name='db2')

# Ошибки из-за попытки осуществить операцию над обработчиком, а не данными
bind_rows(list(db1, db2))

## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl

union_all

# игнорирует названия столбцов и приводит все данные к строкам
union_all(db1, db2)

## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

union

# игнорирует названия столбцов и приводит все данные к строкам
# скорее всего, также потеряет дублирующиеся строки
union(db1, db2)

## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     4     d
## 2     1     a
## 3     3     c
## 4     2     b

replyr_bind_rows

replyr::replyr_bind_rows может связывать вместе несколько data.frame-ов.

replyr_bind_rows(list(db1, db2))

## # Source:   table [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

dplyr::do

В нашем примере просто возьмем по нескольку строк из каждой группы аггрегированного набора данных. Обратите внимание: поскольку мы не задаем порядок в явном виде с помощью arrange, нельзя всегда ожидать совпадения результатов в разных источниках данных (БД или Spark).

dplyr::do на локальных данных

Из help('do', package='dplyr'):

by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))

## # A tibble: 6 x 11
## # Groups:   cyl [3]
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 2  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 3  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 4  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

dplyr::do на Spark

by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))

## # A tibble: 3 x 2
##     cyl     V2
##    
## 1     6 
## 2     4 
## 3     8 

Получаем не совсем то, что можно использовать.

replyr разделение/объединение

mtcars_spark %>%
  replyr_split('cyl', 
               partitionMethod = 'extract') %>%
  lapply(function(di) head(di, 2)) %>%
  replyr_bind_rows()

## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr gapply

mtcars_spark %>%
  gapply('cyl',
         partitionMethod = 'extract',
         function(di) head(di, 2))

## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr::replyr_apply_f_mapped

Что хотелось бы получить: данные с именами, соответствующими коду (т.е. изменить данные, а не код).

По некоторому размышлению этого можно достигнуть, если связать изменение данных с окружением функции, а не с данными. То есть данные изменены до тех пор, пока соответствующая контролирующая функция выполняется. В нашем случае эта функция — replyr::replyr_apply_f_mapped(), и работает она следующим образом.

Предположим, что операция, которую мы хотим использовать — функция понижения порядка, полученная из некоторого неподконтрольного нам источника (например, пакета). Это может быть простая функция (как ниже), но предположим, мы хотим использовать ее без изменений (исключая даже совсем небольшие, как введение wrapr::let()).

# внешняя функция с заданными в явном виде названиями столбцов
DecreaseRankColumnByOne <- function(d) {
  d$RankColumn <- d$RankColumn - 1
  d
}

Чтобы применить эту функцию к d (в котором не такие, как ожидается, названия столбцов!), мы используем replyr::replyr_apply_f_mapped() для создания нового параметризированного адаптера:

# наши данные
d <- data.frame(Sepal_Length = c(5.8,5.7),
                Sepal_Width = c(4.0,4.4),
                Species = 'setosa',
                rank = c(1,2))

# обработчик для ввода параметров
DecreaseRankColumnByOneNamed <- function(d, ColName) {
  replyr::replyr_apply_f_mapped(d, 
                                f = DecreaseRankColumnByOne, 
                                nmap = c(RankColumn = ColName),
                                restrictMapIn = FALSE, 
                                restrictMapOut = FALSE)
}

# использование
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)

##   Sepal_Length Sepal_Width Species rank
## 1          5.8         4.0  setosa    0
## 2          5.7         4.4  setosa    1

replyr::replyr_apply_f_mapped() переименовывает столбцы так, как ожидается в DecreaseRankColumnByOne (соответствие задано в nmap), применяет DecreaseRankColumnByOne и возвращает имена к исходным перед тем, как вернуть результат.

Отслеживание промежуточных результатов

Многие задачи в Sparklyr связаны с созданием промежуточных или временных таблиц. Это можно делать с помощью dplyr::copy_to() и dplyr::compute(). Эти способы могут быть ресурсоёмкими.

В replyr есть функции, позволяющие держать процесс под контролем: генераторы временных имен, не изменяющие собственно данные (они также используются внутри самого пакета).

Сама по себе функция довольно проста:

print(replyr::makeTempNameGenerator)

## function (prefix, suffix = NULL) 
## {
##     force(prefix)
##     if ((length(prefix) != 1) || (!is.character(prefix))) {
##         stop("repyr::makeTempNameGenerator prefix must be a string")
##     }
##     if (is.null(suffix)) {
##         alphabet <- c(letters, toupper(letters), as.character(0:9))
##         suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), 
##             collapse = "")
##     }
##     count <- 0
##     nameList <- list()
##     function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
##         if (length(list(...)) > 0) {
##             stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
##         }
##         if (peek) {
##             return(names(nameList))
##         }
##         if (dumpList) {
##             v <- names(nameList)
##             nameList <<- list()
##             return(v)
##         }
##         if (!is.null(remove)) {
##             victims <- intersect(remove, names(nameList))
##             nameList[victims] <<- NULL
##             return(victims)
##         }
##         nm <- paste(prefix, suffix, sprintf("%010d", count), 
##             sep = "_")
##         nameList[[nm]] <<- 1
##         count <<- count + 1
##         nm
##     }
## }
## 
## 

Например, чтобы объединить несколько таблиц, хорошее решение для некоторых источников данных — вызывать compute после каждого объединения (иначе полученный SQL может стать длинным и трудным для понимания и в поддержке). Код выглядит примерно так:

# создание данных для примера
names <- paste('table', 1:5, sep='_')
tables <- lapply(names, 
                 function(ni) {
                   di <- data.frame(key= 1:3)
                   di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
                   copy_to(sc, di, ni)
                 })

# собственный генератор временных имён
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')

# объединение слева таблиц в последовательности
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
  ti <- tables[[i]]
  if(i<length(tables)) {
    joined <- compute(left_join(joined, ti, by='key'),
                    name= tmpNamGen())
  } else {
    # использование постоянного имени
    joined <- compute(left_join(joined, ti, by='key'),
                    name= 'joinres')
  }
}

# удаление временных значений
temps <- tmpNamGen(dumpList = TRUE)
print(temps)

## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000"
## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001"
## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002"

for(ti in temps) {
  db_drop_table(sc, ti)
}

# результаты
print(joined)

## # Source:   table [?? x 6]
## # Database: spark_connection
##     key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5
##                                      
## 1     1   0.8045418   0.5006293   0.8656174   0.5248073   0.8611796
## 2     2   0.1593121   0.5802938   0.9722113   0.4532369   0.7429018
## 3     3   0.4853835   0.5313043   0.6224256   0.1843134   0.1125551

Аккуратное введение и управление временными данными может сохранить ресурсы (и время, и место) и значительно улучшить результаты. Нам кажется хорошей практикой задавать в явном виде генератор временных имён, передавать его во все преобразования Sparklyr, а затем очищать временные значения все вместе, когда результаты от них больше не зависят.

Заключение

Если вы хотите тщательно контролировать обработку данных в Spark или БД с помощью R, стоит рассмотреть replyr в дополнение к dplyr и sparklyr.

sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()

##           used (Mb) gc trigger (Mb) max used (Mb)
## Ncells  821292 43.9    1442291 77.1  1168576 62.5
## Vcells 1364897 10.5    2552219 19.5  1694265 13.0

Автор: Анна Каплун

Источник

Поделиться

* - обязательные к заполнению поля