Привет, Хабр!
Часто задается вопрос: как эффективно и быстро обработать огромные объемы информации? Ответом на этот вызов стала концепция MapReduce, разработанная в недрах Google.
MapReduce — это парадигма программирования, созданная для обработки и генерации больших объемов данных с использованием параллельных распределенных алгоритмов. Основная фича проста: сначала данные разбиваются на небольшие части (фаза Map), а затем результаты этих частей агрегируются в финальный результат (фаза Reduce).
Зачем?
-
Масштабируемость: MapReduce позволяет распределять задачи на множество узлов, что значительно ускоряет обработку больших данных.
-
Производительность: Параллельное выполнение задач маппинга и редьюсинга обеспечивает порой очень высокую скорость обработки.
-
Устойчивость к ошибкам: Встроенные механизмы MapReduce обеспечивают восстановление после сбоев, что плюсик к надежности.
-
Простота использования: Разработчику нужно лишь определить функции Map и Reduce, а остальное берет на себя фреймворк.
В статье рассмотрим как реализовать MapReduce на Go, какие оптимизации можно применить для улучшения производительности и приведем примеры использования.
MapReduce
Архитектура
-
Mapper
-
Маппер отвечает за обработку входных данных и преобразование их в промежуточные пары ключ-значение. На этапе маппинга входные данные разбиваются на более мелкие части, что позволяет их обрабатывать параллельно.
-
Каждый маппер получает часть входных данных, выполняет над ними определенные операции (например, разбиение текста на слова) и выдает пары ключ-значение (например, слово и количество его вхождений). Благодаря тому, что мапперы работают независимо друг от друга, этот этап легко масштабируется на большое количество узлов.
-
-
Reducer
-
Редьюсер собирает промежуточные пары ключ-значение, сгруппированные по ключам, и выполняет над ними завершающие операции, такие как суммирование или среднее арифметическое.
-
Редьюсер получает все значения, ассоциированные с каждым уникальным ключом, и производит конечные результаты обработки (например, общее количество вхождений каждого слова). Как и мапперы, редьюсеры работают параллельно, обрабатывая различные группы ключей.
-
-
Shuffler
-
Шафлер выполняет сортировку и группировку промежуточных данных, созданных мапперами, перед передачей их редьюсерам. Он гарантирует, что все данные с одинаковыми ключами будут обработаны одним редьюсером. После завершения этапа маппинга, промежуточные данные сортируются по ключам и распределяются между редьюсерами.
-
Шафлер также работает параллельно.
-
-
Master Node
-
Координирующий узел управляет всей работой системы MapReduce. Он распределяет задачи маппинга и редьюсинга между рабочими узлами, отслеживает их состояние и обрабатывает сбои.
-
Координирующий узел распределяет входные данные между мапперами, собирает промежуточные результаты, передает их шффлеру и распределяет задачи редьюсинга.
-
Этот узел также отвечает за повторное выполнение задач, если какой-либо рабочий узел выходит из строя.
-
Реализация в коде
Реализуем такой процесс:
-
Координирующий узел получает запрос на выполнение задачи и разбивает входные данные на фрагменты.
-
Рабочие узлы маппинга получают эти фрагменты и выполняют операции преобразования, генерируя промежуточные пары ключ-значение.
-
Шффлер сортирует и группирует эти промежуточные данные, распределяя их между редьюсерами.
-
Рабочие узлы редьюсинга получают сгруппированные данные и выполняют завершающие операции, генерируя конечные результаты.
-
Координирующий узел собирает результаты от всех редьюсеров и возвращает их пользователю или сохраняет в базе данных.
Приступим.
Координирующий узел управляет всем процессом, начиная с получения входных данных и их разбиения на фрагменты, и заканчивая сбором конечных результатов от редьюсеров:
package main import ( "bufio" "fmt" "log" "os" "sync" ) // структура для хранения задачи type Task struct { filename string } // Главная функция func main(г) { // файл с данными filename := "input.txt" // число мапперов и редьюсеров numMappers := 3 numReducers := 2 // создаем канал для передачи задач мапперам mapTasks := make(chan Task, numMappers) // создаем канал для передачи промежуточных данных шффлеру intermediateData := make(chan map[string]int, numMappers) // создаем канал для передачи данных редьюсерам reduceTasks := make(chan map[string]int, numReducers) var wg sync.WaitGroup // запуск мапперов for i := 0; i < numMappers; i++ { wg.Add(1) go mapper(mapTasks, intermediateData, &wg) } // запуск шафлера go shuffler(intermediateData, reduceTasks, numMappers) // запуск редьюсеров for i := 0; i < numReducers; i++ { wg.Add(1) go reducer(reduceTasks, &wg) } // разбиение файла на задачи и отправка мапперам file, err := os.Open(filename) if err != nil { log.Fatalf("Не удалось открыть файл: %s", err) } scanner := bufio.NewScanner(file) for scanner.Scan() { mapTasks <- Task{filename: scanner.Text()} } close(mapTasks) // ожидание завершения всех горутин wg.Wait() close(intermediateData) close(reduceTasks) fmt.Println("MapReduce завершен.") }
Мапперы получают фрагменты входных данных и преобразуют их в промежуточные пары ключ-значение:
// функция маппера func mapper(tasks <-chan Task, intermediateData chan<- map[string]int, wg *sync.WaitGroup) { defer wg.Done() for task := range tasks { file, err := os.Open(task.filename) if err != nil { log.Fatalf("Не удалось открыть файл: %s", err) } defer file.Close() scanner := bufio.NewScanner(file) counts := make(map[string]int) for scanner.Scan() { line := scanner.Text() words := strings.Fields(line) for _, word := range words { counts[word]++ } } intermediateData <- counts } }
Шафлер сортирует и группирует промежуточные данные, распределяя их между редьюсерами:
// функция шафлера func shuffler(intermediateData <-chan map[string]int, reduceTasks chan<- map[string]int, numMappers int) { aggregatedData := make(map[string]int) for i := 0; i < numMappers; i++ { for data := range intermediateData { for key, value := range data { aggregatedData[key] += value } } } reduceTasks <- aggregatedData }
Редьюсеры получают сгруппированные данные и выполняют завершающие операции, генерируя конечные результаты:
// функция редьюсера func reducer(reduceTasks <-chan map[string]int, wg *sync.WaitGroup) { defer wg.Done() for task := range reduceTasks { finalCounts := make(map[string]int) for key, value := range task { finalCounts[key] += value } // выводим результаты for word, count := range finalCounts { fmt.Printf("%s: %d\n", word, count) } } }
Координирующий узел собирает результаты от всех редьюсеров и возвращает их пользователю или сохраняет в БД:
// главная функция func main() { // пример файла с данными filename := "input.txt" // число мапперов и редьюсеров numMappers := 3 numReducers := 2 // создаем канал для передачи задач мапперам mapTasks := make(chan Task, numMappers) // создаем канал для передачи промежуточных данных шафлеру intermediateData := make(chan map[string]int, numMappers) // создаем канал для передачи данных редьюсерам reduceTasks := make(chan map[string]int, numReducers) var wg sync.WaitGroup // запуск мапперов for i := 0; i < numMappers; i++ { wg.Add(1) go mapper(mapTasks, intermediateData, &wg) } // запуск шафлера go shuffler(intermediateData, reduceTasks, numMappers) // запуск редьюсеров for i := 0; i < numReducers; i++ { wg.Add(1) go reducer(reduceTasks, &wg) } // разбиение файла на задачи и отправка мапперам file, err := os.Open(filename) if err != nil { log.Fatalf("Не удалось открыть файл: %s", err) } scanner := bufio.NewScanner(file) for scanner.Scan() { mapTasks <- Task{filename: scanner.Text()} } close(mapTasks) // ожидание завершения всех горутин wg.Wait() close(intermediateData) close(reduceTasks) fmt.Println("MapReduce завершен.") }
В каких кейсах MapReduce находит применение
Обработка логов
Обработка логов — это типикал задача для MapReduce, особенно там, где объемы логов могут достигать терабайтов данных ежедневно. Логи могут включать информацию о системных событиях, пользовательских действиях, ошибках и многом другом.
-
Map: На этапе маппинга каждый лог‑файл обрабатывается для извлечения ключевых данных, таких как временные метки, типы событий и идентификаторы пользователей. Каждый маппер генерирует промежуточные пары ключ‑значение, где ключом может быть, например, тип события, а значением — информация об этом событии.
-
Shuffle: На этапе шффлинга данные сортируются и группируются по ключам, что позволяет собрать все события одного типа вместе.
-
Reduce: На этапе редьюсинга агрегируются и анализируются данные. Например, подсчитывается количество каждого типа событий, определяется количество уникальных пользователей и анализируются временные метки для выявления пиков активности.
Анализ текстов
-
Map: Каждый документ разбивается на отдельные слова, которые затем преобразуются в пары ключ‑значение, где ключ — это слово, а значение — единица.
-
Shuffle: Пары ключ‑значение сортируются и группируются по ключам, что позволяет собрать все вхождения каждого слова вместе.
-
Reduce: В редьюсерах подсчитывается количество вхождений каждого слова, что позволяет получить частотный словарь.
Анализ Clickstream
Анализ clickstream данных позволяет понимать поведение пользователей на их веб‑сайтах и мобильных приложениях.
-
Map: Каждый clickstream лог обрабатывается для извлечения данных о действиях пользователя.
-
Shuffle: Данные сортируются и группируются по пользователям или сессиям, что позволяет собрать всю информацию о действиях одного пользователя вместе.
-
Reduce: В редьюсерах анализируются данные о поведении пользователей, что позволяет выявить популярные страницы, типичные пути пользователей и потенциальные узкие места в пользовательском интерфейсе.
MapReduce позволяет решать сложные задачи анализа данных, распределяя нагрузку и тем самым обеспечивая высокую производительность и масштабируемость.
В заключение напомню о ближайших открытых уроках:
-
18 июля: Дженерики в Go. На вебинаре вы узнаете механизмы обобщенного программирования с использованием дженериков. Мы рассмотрим внутренние механизмы работы дженериков в Go, а также примеры использования. Запись по ссылке
-
25 июля: Как сделать быстрорастущий сервис с помощью трейсинга? На вебинаре мы наглядно рассмотрим работу сервиса под нагрузкой и найдем запрос с помощью трейсинга. Покажем кейсы, когда уже есть логирование. Запись по ссылке
ссылка на оригинал статьи https://habr.com/ru/articles/828672/
Добавить комментарий