Публикация очень краткая. Многие думают, что параллельные вычисления в R — это очень сложно и неприменимо к их текущим задачам.
И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта. Приведенные примеры сознательно похожи на продуктивные задачи, а не выхолощенные пара строчек синтетики.
Является продолжением серии предыдущих публикаций.
Используемые пакеты
Загрузка пакетов
library(tidyverse) library(magrittr) library(stringi) library(glue) library(dqrng) library(iterators) library(future) library(foreach) library(doFuture) library(tictoc) library(futile.logger) library(lgr) # будем использовать его рутовый логгер `lgr` library(hrbrthemes)
Паттерны параллелизации
Паттерн 1. Параллелизация tidyverse вычислений
Ситуация. Есть скрипт, содержащий множество пайплайнов на tidyverse
.
Пример задачи. Подсчитаем среднее от суммы квадратов чисел. Для повышения эффективности параллельных вычислений важно уменьшить объемы перекачки данных между потоками. Используем пакет furrr.
`tidyverse` pipeline
registerDoFuture() # future::plan(multiprocess) workers <- parallel::detectCores() - 1 future::plan(multisession, workers = workers) num_row <- 1:10^6 ff_seq <- function(x) x^2 ff_par <- function(x) mean(x^2) tic("Считаем последовательно") lst1 <- num_row %>% purrr::map_dbl(ff_seq) %>% mean() toc() tic("Считаем параллельно, вариант 1") lst2 <- num_row %>% furrr::future_map_dbl(ff_seq) %>% mean() toc() tic("Считаем параллельно, вариант 2") lst2 <- num_row %>% split(cut(seq_along(.), workers, labels = FALSE)) %>% furrr::future_map_dbl(ff_par) %>% mean() toc()
Естественно, результат зависит от аппаратной платформы и ОС, на которой все запускается. На тестовом прогоне у меня такая раскладка:
Считаем последовательно: 7.23 sec elapsed Считаем параллельно, вариант 1: 3.43 sec elapsed Считаем параллельно, вариант 2: 0.64 sec elapsed
Windows и Linux достаточно сильно отличаются по методам параллелизации. Linux в продуктиве сильно предпочтительнее Windows.
Паттерн 2. Локальная ручная параллелизация
Ситация. В ряде случаев при работе скрипта необходимо выполнить незначительное число разовых неунифицируемых операций. Например, загрузка справочников и различных первичных данных. Есть возможное решение, функция %<-%
.
Генерация сэмплов
# создаем последовательность, матрица 20 атрибутов на 10^5 событий nn <- 10^5 tic("Generating sample data.frame") df <- 100 %>% # stri_rand_strings(length = 10, pattern = "[a-z]") %>% sample(10^4:10^5, .) %>% sample(20 * nn, replace = TRUE) %>% matrix(byrow = TRUE, ncol = 20) %>% as_tibble(.name_repair = "universal") %>% mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>% # сгенерируем версию объекта mutate(ver = sample(1:20, n(), replace = TRUE)) %>% select(user_id, ver, everything()) toc() # сохраним в файл для последующей демонстрации параллелизации demo_fpath <- here::here("temp", "demo_data.xlsx") openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
Два варианта загрузки файлов
plan(multisession, workers = parallel::detectCores() - 2) # plan(sequential) # https://github.com/HenrikBengtsson/future # считаем, что воркеров у нас 2 tic("Объединяем последовательно обработанные файлы") tic("Читаем файлы последовательно") res_lst <- list() for (j in 1:6) { res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)} } toc() seq_df <- bind_rows(res_lst) toc() tic("Объединяем параллельно обработанные файлы") tic("Читаем файлы параллельно") df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} toc() par_df <- bind_rows(df1, df2, df3, df4, df5, df6) toc() all_equal(seq_df, par_df)
Некая разница наблюдается. Пример исключительно для демонстрации принципа. На тестовом прогоне у меня такая раскладка:
Объединяем последовательно обработанные файлы: 46.23 sec elapsed Объединяем параллельно обработанные файлы: 37.82 sec elapsed
Паттерн 3. Параллелизация сложного процессинга
Ситуация. Много вычислительного данных, много кода, процессинг потенциально независим.
Пример.
Сделаем общее задание. Будем считать число сочетаний $C_n^k$
. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.
Генерация списка заданий.
Подготовка логгеров
# подготовка логгеров flog_logname <- here::here("log", "job_futile.log") lgr_logname <- here::here("log", "job_lgr.log") initLogging <- function(log_file){ lgr <- get_logger_glue("logger") lgr$set_propagate(FALSE) lgr$set_threshold("all") lgr$set_appenders(list( console = AppenderConsole$new( threshold = "info" ), file = AppenderFile$new( file = log_file, threshold = "all" ) )) lgr } invisible(flog.appender(appender.tee(flog_logname))) invisible(flog.threshold(INFO)) lgr <- initLogging(lgr_logname)
Многопоточные расчеты
"Start batch processing" %T>% flog.info() %T>% lgr$info() # инициализируем параллельную обработку # https://github.com/HenrikBengtsson/doFuture # https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html registerDoFuture() # future::plan(multiprocess) future::plan(multisession, workers = parallel::detectCores()) # future::plan(sequential) # plan(future.callr::callr) tic("Batch processing") start_time <- Sys.time() foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), # .packages = 'futile.logger', .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% { start <- Sys.time() - start_time # инициализируем логгер в потоке flog.appender(appender.tee(flog_logname)) lgr <- initLogging(lgr_logname) res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE) # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/ message(" message from thread") glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.", "PID: {Sys.getpid()}", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ") %T>% flog.info() %T>% lgr$info() # вернем тайминги тоже return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time)) } -> output_lst flog.info("Foreach finished") checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1) output_tbl <- dplyr::bind_rows(output_lst) # rm(output_lst) # терминируем параллельную обработку -------------- future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc()))
Для иллюстрации процесса нарисуем график запуска (точка) и завершения (крестик) задач на вычислителях. Хорошо видны первичные затраты на старт потоков windows.
Код построения графика.
# посмотрим графически на порядок запуска вычислителей output_tbl %>% mutate_at("pid", as.factor) %>% mutate_at(vars(start, finish), as.numeric) %>% ggplot(aes(start, pid, colour = pid)) + geom_point(size = 3, alpha = .7) + geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") + geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) + ggthemes::scale_fill_tableau("Tableau 10") + theme_ipsum_rc() + xlim(c(0, 5))
Заключение
При параллелизации задач, для достижения максимальной эффективности вычислений следует учитывать ряд важных моментов, вытекающих из принципов работы компьютеров, ОС и теоретических пределов. Если не погружаться глубоко в детали, резюмируем в виде «проверочных пунктов»:
-
Инициализация вычислителей (worker) является достаточно дорогостоящей. Требуется породить новое окружение (поток, кластер, …), его инициализировать. Для коротких вычислений (секунды) затраты на инициализацию могут оказаться существенно выше однопоточного вычисления.
-
При выделении потоков на одной машине, рекомендуется отдавать под вычислители core — 1, или чуть меньше. Один поток выполняет роль мастера, раздающего задания и выполняющего reduce ответов, получаемых от вычислителей. Ну и самой операционке тоже могут быть нужны ресурсы.
-
Дескрипторы файлов и коннектов к БД не переходят границы потоков.
-
Накладные расходы на перегон больших объемов данных из мастер потока в вычислитель и обратно могут оказаться по времени существенно выше, чем время вычисления. Оптимально, если мастер поток дает метаинформацию по заданию, а вычислитель уже сам загружает эти данные (из БД, из файлов, из API и т.д.). Ну и результат наверх должен уходить максимально агрегированный.
-
Состав заданий надо максимально готовить в мастер потоке, а вниз спускать грубую вычислительную подзадачу. Повышается прозрачность и воспроизводимость.
-
Для ряда задач, связанных с длинными синхронными запросами внешних системы (типичные представители — REST API/Web scrapping), можно создавать вычислителей много больше чем доступных ядер. Они все равно висят большую часть времени в режиме ожидания. Можно запускать в виде отдельных процессов ОС с помощью настройки соответствующего бэкенда
registerDoFuture();
plan(future.callr::callr).
Это оставшаяся 1/2 рецепта.
Предыдущая публикация — «Нюансы эксплуатации R решений в enterprise окружении?».
ссылка на оригинал статьи https://habr.com/ru/post/543940/
Добавить комментарий