Java: executor с уплотнением по ключам

от автора

image

Существует типичная проблема в большом классе задач, которая возникает при обработке потока сообщений:

— нельзя пропихнуть большого слона через маленькую трубу, или другими словами, обработка сообщений не успевает «проглотить» все сообщения.

При этом существуют некоторые ограничения на поток данных:

  • поток не равномерный и состоит из событий разного типа
  • количество типов событий заранее не известно, но некоторое конечное число
  • каждый тип события имеет свою актуальность во времени
  • все типы событий имеют равный приоритет

На диаграмме приведён пример разрешения проблемы: нагребатор(tm), работающий на нитке T1, в то время как разгребатор(tm) работает на нитке T2

  • за время обработки события типа A успевают прийти новые события как типа B, так и A
  • после обработки события типа B необходимо обработать наиболее актуальное событие типа A

Т.о. стоит задача о выполнении задач по ключу, так, что выполняется только самая актуальная из всех задач по данному ключу.

На суд публике представляется созданный нами ThrottlingExecutor.

Замечание терминологии: stream есть поток данных, тогда как thread есть нитка или нить выполнения. И не стоит путать потоки с нитками.

Замечание 1: проблема осложняется ещё тем, что может быть несколько нагребаторов(tm), при этом каждый нагребатор(tm) может порождать только события одного типа; с другой стороны есть потребность в нескольких (конечно же, для простоты можно выбрать N=1) разгребаторах(tm).

Замечание 2: мало того, что данный код должен работать в многопоточной (конкурентной) среде — т.е то самое множество нагребаторов(tm)разгребаторов(tm), код должен работать с максимальной производительностью и низкими latency. Резонно к этим всем качествам добавить ещё и свойство garbage less.

И почти в каждом проекте так или иначе возникает эта задача, и каждый её решает по разному, но все они либо не эффективны, либо медленны, либо и то, и другое вместе взятое.

Небольшое лирическое отступление.
На мой взгляд задача очень интересная, вполне практическая и более того — из нашей специфики работы. И именно поэтому, мы задаём нашим кандидатам на собеседовании. Однако мы не просим буквально закодить всё от и до, а построить общий дизайн решения, по возможности освещая ключевые моменты решения кусками кода. После нескольких месяцев собеседований мы таки решили воплотить идеи в виде кода на java.

Впрочем, код, пока не может быть открыт публично, поэтому идеи общие и могут быть применены и воплощены на многих других языках программирования.

Поскольку всё уже закожено, и быть может осталось навести небольшой марафет, опишу ключевые моменты решения.

Итак, оглядевшись по сторонам мы не нашлись как сделать то, что хотим высокоуровневыми структурами данных доступных в jdk, поэтому будем конструировать из самых базовых блоков.

Картина дизайна в целом:

  • т.к. нет необходимости хранить все полученные значения, а нужны только самые актуальные — то хорошо бы хранить пары ключ-значение в некотором ассоциативном массиве, перетирая старые значения новыми
  • бежать по ассоциативному массиву, забирать (помечая ячейку спец. значением, например, null) самые свежие значения и отдавать в обработчик.

Ассоциативный массив

Ключевой аспект: хранение пары ключ-значение. Можно пренебречь порядком хранения, выигрывая при этом по скорости обновления — т.о. напрашивается использование hash структуры, сложность операций которой O(1).

Негативный эффект на производительность hash структуры оказывает коллизия по hash кодам на выбранном размере. Два самых распостранённых метода разрешения коллизий:

  • цепочки — каждый элемент массива представляет из себя связанный список, который хранит элементы с одинаковыми (с точностью до модуля размера массива) hash кодами
  • открытая адресация — при возникновении коллизии происходит поиск первой свободной ячейки за (или перед) ячейкой соответствующей hash коду. Как правило в пользу производительности ограничивают количество проб свободной ячейки. Когда превышено число проб нахождения свободной ячейки для вставки нового элемента, производят расширение массива.

Метод цепочек более стабилен, т.к. не спотыкается о проблему плохого распределения hash кодов, в то время как открытая адресация явно не переживёт, если все ключи будут иметь hash код равный, например, некоторой константе. С другой стороны открытая адресация имеет существенно меньшие накладные расходы на память.

Ещё один плюс в пользу открытой адресации — cache locality — данные в массиве лежат последовательно в памяти и так же последовательно будут загружены в cpu cache, т.о. быстрая последовательная итерация в отличии от использования цепочек, где указатели на связанные списки как-то разбросаны по памяти.

Исходя из общих принципов адекватности применения, можно смело рассчитывать, что функция hash кодов не будет вырожденной и выбор ложится на открытую адресацию.

Теперь рассмотрим элемент массива:

atomic refs

Поскольку есть требование работы в многопоточной и высоконагруженной среде, то ни к чему возится с synchronized-блоками, а работать через Compare-And-Swap, поэтому каждый элемент представляет из себя расширение AtomicReference с ключём:

static class Entry<K> extends AtomicReference {      final K key;      public Entry(final K key){        this.key =  key;     } } 

atomic array ref

Другим заходом, чтобы обеспечить консистентность и атомарность памяти — использовать не массив AtomicReference-ов, а AtomicReferenceArray.

Мотивы — меньше дополнительных накладных расходов на занимаемую память, последовательность расположения в памяти. При этом существенно сложнее становится схема по расширению/сжатию массива.

Card mark

Основная идея: итерироваться не по всем элементам, а только по измённым элементам (в идеале), или сегментам (состоящим из небольшого разумного числа элементов), которые содержат изменённые элементы.

Simple card mark

Простой в реализации подход, использующий AtomicLong в качестве маски, позволяет покрывать до 64 изменённых сегментов.

Однако, при размере массива уже в 4096 элементов, сегмент состоит из 64 элементов, т.о. потенциально, можно совершить немало пустых чтений.

Binary heap card mark

Следующим шагом хотелось расширить число сегментов, сохранив при этом компактность хранения в памяти и простоту обхода. При этом выбранная структура данных должна быть удобна и с точки зрения wait strategy.

С этих точек зрения очень удобна двоичная куча — нулевой элемент указывает на то были ли изменения вообще или нет (если нет, то можно и погрызть камень / заснуть = применить стратегию ожидания ) и уже последующие элементы указывают на изменённые сегменты исходного массива.

Так, при наличии второго уровня и размере массива в 4096 элементов, сегмент содержит ровно один элемент.

Т.о. simple card mark должен быть хорош при малых размерах массива, а binary heap card mark при больших размерах.

Wait Strategy

Еще один аспект и прямая отсылка к D., которую нельзя не упомянуть в связке с card mark — это применение стратегии ожидания изменённых сегментов: не стоит сразу впадать в состояние пассивного ожидания (т.е вызывать wait на мониторе) если нет изменений, вполне возможно, что удастся получить изменение активно poll’я card mark.

Например, busy spin опрашивает в цикле корневой элемент card mark-а — если есть изменения, выходим — обрабатываем элементы. Если нет — продолжаем цикл. Ограничив цикл, например, сотней попыток, впадаем в состояние пассивного ожидания по wait, и пусть нас разбудит нагребатор(tm), увидев, что card mark-а был кристально чист и пуст.

Впрочем, стратегия может меняться в каждом конкретном случае.

уловки и ухищрения

  • поддерживая размеры массива и binary heap как 2K можно избежать использования операций деления, умножения и mod и использовать более дешёвые битовые аналоги: сдвиг вправо K, сдвиг влево K и применение битовых маск (& ((1 << K) — 1)) соответственно
  • основная, и самая частая операция в стабильной фазе — операция замещения/replace происходит только за счёт CAS для найденного элемента массива без захвата какого-либо монитора
  • вычитывая значение из ячейки или card mark вместо безусловного get-and-set(0) иногда (при высокой разряжённости массива) разумней сделать test-n-get-n-set, дабы избежать излишнего (и более дорогого) volatile write
  • расширение/сужение массива (в контексте корректности JMM) относится к начальной фазе
    • volatile ссылка на массив — быстрый volatile read
    • обновление volatile ссылки при помощи AtomicReferenceFieldUpdater магии
    • учитывая, что количество ключей ограничено, можно масштабирование массива производить под монитором

В финале графики распределения latency:

  • 1 нагребатор(tm) — 1 разгребатор(tm)
  • 700 уникальных ключей, hash коллизии < 1%
  • разогрев jvm: 20000 первых итераций игнорируются при измерении latency
  • задача, переданная в executor: зафиксировать время (в мкс) прохождения через executor — т.е зафиксировать latency


Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 4096


Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 4096

В сильно разреженном массиве:

Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 16384


Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 16384

До конца найти всех причин столь разного поведения массива атомарных ссылок и атомарного массива мне не удалось (ожидали скорее увидеть, что атомарный массив с двоичной кучей будет работать куда лучше).

Возможно, и очень надеюсь, когда получится открыть код, мы найдём ошибки или получим ответы.

P.S. Стоит отметить, что испытывая в этом benchmark-е существовавшие ранее решения ThrottlingExecutor-ов, они давали почти ровное распределение до 300 мкс.

ссылка на оригинал статьи http://habrahabr.ru/post/172209/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *