- PVSM.RU - https://www.pvsm.ru -

Существует типичная проблема в большом классе задач, которая возникает при обработке потока сообщений:
— нельзя пропихнуть большого слона через маленькую трубу, или другими словами, обработка сообщений не успевает «проглотить» все сообщения.
При этом существуют некоторые ограничения на поток данных:
На диаграмме приведён пример разрешения проблемы: нагребатор(tm), работающий на нитке T1, в то время как разгребатор(tm) работает на нитке T2
Т.о. стоит задача о выполнении задач по ключу, так, что выполняется только самая актуальная из всех задач по данному ключу.
На суд публике представляется созданный нами ThrottlingExecutor.
Замечание терминологии: stream есть поток данных, тогда как thread есть нитка или нить выполнения. И не стоит путать потоки с нитками.
Замечание 1: проблема осложняется ещё тем, что может быть несколько нагребаторов(tm), при этом каждый нагребатор(tm) может порождать только события одного типа; с другой стороны есть потребность в нескольких (конечно же, для простоты можно выбрать N=1) разгребаторах(tm).
Замечание 2: мало того, что данный код должен работать в многопоточной (конкурентной) среде — т.е то самое множество нагребаторов(tm) — разгребаторов(tm), код должен работать с максимальной производительностью и низкими latency. Резонно к этим всем качествам добавить ещё и свойство garbage less.
И почти в каждом проекте так или иначе возникает эта задача, и каждый её решает по разному, но все они либо не эффективны, либо медленны, либо и то, и другое вместе взятое.
Небольшое лирическое отступление.
На мой взгляд задача очень интересная, вполне практическая и более того — из нашей специфики работы. И именно поэтому, мы задаём нашим кандидатам на собеседовании. Однако мы не просим буквально закодить всё от и до, а построить общий дизайн решения, по возможности освещая ключевые моменты решения кусками кода. После нескольких месяцев собеседований мы таки решили воплотить идеи в виде кода на java.
Впрочем, код, пока не может быть открыт публично, поэтому идеи общие и могут быть применены и воплощены на многих других языках программирования.
Поскольку всё уже закожено, и быть может осталось навести небольшой марафет, опишу ключевые моменты решения.
Итак, оглядевшись по сторонам мы не нашлись как сделать то, что хотим высокоуровневыми структурами данных доступных в jdk, поэтому будем конструировать из самых базовых блоков.
Картина дизайна в целом:
Ключевой аспект: хранение пары ключ-значение. Можно пренебречь порядком хранения, выигрывая при этом по скорости обновления — т.о. напрашивается использование hash структуры, сложность операций которой O(1).
Негативный эффект на производительность hash структуры оказывает коллизия по hash кодам на выбранном размере. Два самых распостранённых метода разрешения коллизий:
Метод цепочек более стабилен, т.к. не спотыкается о проблему плохого распределения hash кодов, в то время как открытая адресация явно не переживёт, если все ключи будут иметь hash код равный, например, некоторой константе [3]. С другой стороны открытая адресация имеет существенно меньшие накладные расходы на память.
Ещё один плюс в пользу открытой адресации — cache locality — данные в массиве лежат последовательно в памяти и так же последовательно будут загружены в cpu cache, т.о. быстрая последовательная итерация в отличии от использования цепочек, где указатели на связанные списки как-то разбросаны по памяти.
Исходя из общих принципов адекватности применения, можно смело рассчитывать, что функция hash кодов не будет вырожденной и выбор ложится на открытую адресацию.
Теперь рассмотрим элемент массива:
Поскольку есть требование работы в многопоточной и высоконагруженной среде, то ни к чему возится с synchronized-блоками, а работать через Compare-And-Swap, поэтому каждый элемент представляет из себя расширение AtomicReference с ключём:
static class Entry<K> extends AtomicReference {
final K key;
public Entry(final K key){
this.key = key;
}
}
Другим заходом, чтобы обеспечить консистентность и атомарность памяти — использовать не массив AtomicReference-ов, а AtomicReferenceArray.
Мотивы — меньше дополнительных накладных расходов на занимаемую память, последовательность расположения в памяти. При этом существенно сложнее становится схема по расширению/сжатию массива.
Основная идея: итерироваться не по всем элементам, а только по измённым элементам (в идеале), или сегментам (состоящим из небольшого разумного числа элементов), которые содержат изменённые элементы.
Простой в реализации подход, использующий AtomicLong в качестве маски, позволяет покрывать до 64 изменённых сегментов.
Однако, при размере массива уже в 4096 элементов, сегмент состоит из 64 элементов, т.о. потенциально, можно совершить немало пустых чтений.
Следующим шагом хотелось расширить число сегментов, сохранив при этом компактность хранения в памяти и простоту обхода. При этом выбранная структура данных должна быть удобна и с точки зрения wait strategy.
С этих точек зрения очень удобна двоичная куча — нулевой элемент указывает на то были ли изменения вообще или нет (если нет, то можно и погрызть камень / заснуть = применить стратегию ожидания ) и уже последующие элементы указывают на изменённые сегменты исходного массива.
Так, при наличии второго уровня и размере массива в 4096 элементов, сегмент содержит ровно один элемент.
Т.о. simple card mark должен быть хорош при малых размерах массива, а binary heap card mark при больших размерах.
Еще один аспект и прямая отсылка к D., которую нельзя не упомянуть в связке с card mark — это применение стратегии ожидания изменённых сегментов: не стоит сразу впадать в состояние пассивного ожидания (т.е вызывать wait на мониторе) если нет изменений, вполне возможно, что удастся получить изменение активно poll'я card mark.
Например, busy spin опрашивает в цикле корневой элемент card mark-а — если есть изменения, выходим — обрабатываем элементы. Если нет — продолжаем цикл. Ограничив цикл, например, сотней попыток, впадаем в состояние пассивного ожидания по wait, и пусть нас разбудит нагребатор(tm), увидев, что card mark-а был кристально чист и пуст.
Впрочем, стратегия может меняться в каждом конкретном случае.
В финале графики распределения latency:

Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 4096

Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 4096
В сильно разреженном массиве:

Гистограмма распределения latency в ThrottlingExecutor,
простой card mark, размер 16384

Гистограмма распределения latency в ThrottlingExecutor,
card mark на двоичной куче, размер 16384
До конца найти всех причин столь разного поведения массива атомарных ссылок и атомарного массива мне не удалось (ожидали скорее увидеть, что атомарный массив с двоичной кучей будет работать куда лучше).
Возможно, и очень надеюсь, когда получится открыть код, мы найдём ошибки или получим ответы.
P.S. Стоит отметить, что испытывая в этом benchmark-е существовавшие ранее решения ThrottlingExecutor-ов, они давали почти ровное распределение до 300 мкс.
Автор: vladimir_dolzhenko
Источник [4]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/29055
Ссылки в тексте:
[1] цепочки: http://en.wikipedia.org/wiki/Hash_table#Separate_chaining_with_linked_lists
[2] открытая адресация: http://en.wikipedia.org/wiki/Hash_table#Open_addressing
[3] некоторой константе: http://en.wikipedia.org/wiki/42_(number)
[4] Источник: http://habrahabr.ru/post/172209/
Нажмите здесь для печати.