Введение
В традиционных решениях для обмена сообщениями на основе очередей используется хранилище под названием очередь сообщений. Это репозиторий данных, полученных или переданных участниками с помощью механизма асинхронного обмена.
Обмен данными на основе очередей является надежным фундаментом создания отказоустойчивой масштабируемой архитектуры обмена сообщениями с поддержкой широкого спектра универсальных сценариев в среде распределенных вычислений. Вне зависимости от поставленной задачи (диспетчеризация больших объемов работ или надежный обмен сообщениями), технология очередей сообщений предоставляет первоклассные возможности обмена информацией в соответствии с требованиями асинхронной передачи данных.
В этом документе описаны функциональные возможности платформы Windows Azure, а также способы применения шаблонов проектирования для создания оптимизированных и недорогих систем обмена сообщениями на основе очередей. Документ содержит подробный обзор основных методов программной реализации взаимодействия на основе очередей в современных решениях для платформы Windows Azure, а также рекомендации по повышению производительности, увеличению масштабируемости и сокращению эксплуатационных расходов.
Сценарий
Для большей наглядности мы обобщим реальный сценарий следующим образом. Поставщик SaaS-решений вводит в эксплуатацию новую биллинговую систему, реализованную в виде приложения Windows Azure и обслуживающую потребности предприятия в обработке большого количества транзакций клиентов. Решение основано на переносе рабочих нагрузок в облачную среду и использовании эластичности инфраструктуры Windows Azure для выполнения сложных вычислений.
Локальный элемент комплексной инфраструктуры обеспечивает регулярную ежедневную консолидацию и диспетчеризацию больших объемов транзакций для их последующей обработки службой, размещенной в облачной среде Windows Azure. Объем передаваемых транзакций варьируется от нескольких тысяч до сотен тысяч в рамках одного пакета, а совокупный суточный объем может достигать нескольких миллионов транзакций. Это решение должно отвечать требованиям соглашения об уровне обслуживания (SLA) в части обеспечения гарантированной максимальной задержки при обработке данных.
Архитектура решения основана на шаблоне распределенных систем под названием map-reduce и состоит из множества экземпляров рабочих ролей облачного уровня, использующих хранилище очередей Windows Azure для диспетчеризации работы. Пакеты транзакций принимаются ролью Инициатор процесса (Process Initiator Worker). Затем они разбиваются на более мелкие рабочие задачи, которые передаются очередям Windows Azure для распределения рабочей нагрузки.
Рабочая нагрузка обрабатывается большим количеством экземпляров роли Обработчик рабочей нагрузки (Processing Worker), которые извлекают рабочие операции из очередей, и выполняет вычислительные процедуры. В этих экземплярах обработчиков применяются многопотоковые слушатели очередей для параллельной обработки данных, обеспечивающей максимальную производительность.
Обработанные рабочие элементы перенаправляются в выделенную очередь, из которой извлекаются экземпляром роли Контроллер процесса (Process Controller Worker) для агрегирования и долгосрочного хранения в хранилище с целью интеллектуального анализа данных и создания отчетности.
Архитектура решения выглядит следующим образом:
Приведенная выше схема иллюстрирует пример типичной архитектуры, применяемой для масштабирования крупных или сложных вычислительных нагрузок. Шаблон обмена сообщениями на основе очередей, реализованный в этой архитектуре, является типовым для многих приложений и служб Windows Azure, которым необходимо обмениваться друг с другом данными при помощи очередей. Это позволяет использовать канонический подход к изучению основных компонентов, применяемых для обмена сообщениями на основе очередей.
Основы обмена сообщениями с помощью очередей
Стандартное решение для обмена сообщениями, поддерживающее обмен данными между распределенными компонентами, включает в себя издателей, которые помещают сообщения в очереди, а также одного или нескольких подписчиков, получающих эти сообщения. В большинстве случаев подписчики, которых иногда называют слушателями очередей, реализуются в виде одно- или многопотоковых приложений, которые выполняются непрерывно или запускаются по требованию, согласно расписанию пользователя.
На более высоком уровне применяются два основных механизма диспетчеризации, которые позволяют слушателю очереди получать хранящиеся в ней сообщения:
- Опрос (модель на основе запросов — polling): Слушатель отслеживает очередь, проверяя наличие в ней новых сообщений через регулярные интервалы времени. Если очередь пуста, слушатель продолжает опрашивать очередь, периодически переходя в состояние сна.
- Переключение (модель на основе принудительной отправки данных — pushing): Слушатель подписывается на событие, с которым связан триггер (самим издателем или менеджером по обслуживанию очереди), срабатывающий при передаче сообщения в очередь. Слушатель может инициировать обработку сообщения, поэтому ему не придется заново опрашивать очередь для определения наличия новых данных.
Необходимо подчеркнуть, что практическая реализация каждого из механизмов имеет свои особенности. Например, опрос может быть как блокирующим, так и неблокирующим. Блокировка ставит запрос на удержание до тех пор, пока в очереди не появится новое сообщение (или не истечет время ожидания), в то время как неблокирующие запросы выполняются незамедлительно, если очередь пуста. Применение модели переключения позволяет принудительно передавать уведомления слушателям очереди всякий раз, когда самое первое сообщение приходит в пустую очередь или когда глубина очереди достигает определенной величины.
Примечание. Операция удаления из очереди, поддерживаемая API Windows Azure Queue Service, является неблокирующей. Это означает, что методы API GetMessage или GetMessages немедленно завершат свою работу, если очередь сообщений пуста. В отличие от них буферы Durable Message Buffers (DMB), предоставляемые шиной интеграции Windows Azure, используют блокирующие операции получения сообщений, при которых вызывающий поток блокируется до тех пор, пока в очередь DMB не приходит сообщение или не истекает заданный период ожидания.
Можно выделить следующие наиболее распространенные подходы к программной реализации слушателей очередей Windows Azure:
- Слушатель реализуется в виде компонента приложения, экземпляр которого создается и выполняется в составе экземпляра рабочей роли.
- Жизненный цикл компонента слушателя роли часто привязан к времени выполнения экземпляра размещенной роли.
- Основная логика обработки представляет собой петлю, с помощью которой сообщения изымаются из очередей и направляются на обработку.
- Если очередь полученных сообщений пуста, слушатель переходит в режим сна, продолжительность которого определяется алгоритмами остановки системы и зависит от приложения.
- Выполняется программный цикл получения сообщений; очередь опрашивается до тех пор, пока слушатель не получит уведомление для выхода из цикла и завершения своей работы.
Следующая блок-схема иллюстрирует стандартную логику реализации слушателя очереди со встроенным механизмом опроса в среде Windows Azure.
Примечание. Обсуждение более сложных шаблонов решений, например, требующих применения центрального менеджера очереди (брокера), выходит за рамки этого документа.
Использование классического слушателя очереди совместно с механизмом опроса — не самый оптимальный выбор. Модель ценообразования Windows Azure основана на подсчете числа транзакций внутри хранилища с учетом количества запросов приложения к очереди, независимо от того, заполнена ли очередь. Следующие разделы посвящены обсуждению методов максимального повышения производительности и минимизации затрат на внедрение систем обмена сообщениями на основе очередей Windows Azure.
Рекомендации по снижению стоимости решения, обеспечению производительности и масштабируемости
В этом разделе описаны способы проектирования, позволяющие обеспечить повышенную производительность и улучшенную масштабируемость, а также снизить стоимость готового решения.
Шаблон реализации системы можно назвать более эффективным решением только в том случае, если он обеспечивает достижение следующих целей:
- Сокращение операционных затрат за счет отказа от транзакций по обмену данными с хранилищем, не выполняющих полезной работы.
- Устранение излишних задержек, связанных с использованием интервала опроса при проверке наличия новых сообщений в очереди.
- Динамическое масштабирование (расширение и уменьшение) за счет адаптации вычислительных мощностей к меняющимся объемам работы.
Шаблон внедрения должен выполнять эти задачи, не усложняя систему, иначе преимущества его внедрения будут сведены на нет.
Рекомендации по оптимизации затрат на транзакции при обмене данными с хранилищем
При оценке показателей совокупной стоимости владения (total cost of ownership, TCO) и рентабельности инвестиций (return on investment, ROI) решения, развернутого на платформе Windows Azure, одной из важнейших переменных формулы TCO является объем транзакций при обмене данными с хранилищем. Сокращение числа транзакций по обмену данными с очередями Windows Azure позволяет уменьшить эксплуатационные расходы, связанные с использованием решений Windows Azure.
При внедрении решения для обмена сообщениями на основе очередей разработчики могут сократить количество транзакций обмена данными с хранилищем.
- При отправке сообщений в очередь можно группировать связанные друг с другом сообщения в один, более крупный пакет, сжимать и хранить сжатый образ в хранилище BLOB-объектов, а затем использовать очередь для хранения ссылки на BLOB-объект с этими данными.
- При получении сообщений из очереди можно объединять несколько сообщений в один пакет для проведения транзакций по обмену данными с хранилищем. Метод GetMessages, реализованный в API службы очередей, обеспечивает удаление из очереди указанного количества сообщений в рамках одной транзакции (см. примечание ниже).
- При проверке наличия рабочих элементов в очереди избегайте использования агрессивных интервалов опроса и установите временную задержку, увеличивающую интервал опроса очереди, если запросы к ней не возвращают данных.
- Сократите количество слушателей очереди — при использовании модели на основе запросов применяйте только один слушатель для каждого из экземпляров роли, когда очередь пуста. Чтобы свести к нулю количество слушателей для каждого из экземпляров роли, используйте механизм уведомлений для создания экземпляров слушателей при получении очередью рабочих элементов.
- Если рабочие очереди остаются пустыми большую часть времени, создайте механизм автоматического сокращения числа экземпляров ролей, отслеживающий системные показатели, чтобы определить момент, когда приложению необходимо увеличить количество экземпляров ролей для обработки возросшей рабочей нагрузки.
Приведенные выше рекомендации могут быть реализованы в виде общего механизма, предназначенного для обработки пакетов сообщений и инкапсуляции большей части базовых операций взаимодействия с очередями, хранилищем BLOB-объектов и управления потоками. Далее будут рассмотрены способы реализации такого механизма.
Важная информация. При получении сообщений с помощью метода GetMessages максимальный размер пакета API службы Queue Service для операции удаления из очереди равен 32. Превышение этого значения приведет к появлению исключения среды выполнения.
Расходы на выполнение транзакций в очередях Windows Azure возрастают линейно при увеличении числа клиентов службы очереди, например, при масштабировании количества экземпляров роли или при увеличении числа потоков удаления из очереди. Для демонстрации возможного роста затрат при реализации решения без учета приведенных выше рекомендаций приведем пример с конкретными цифрами.
Влияние неэффективной архитектуры на расходы
Создание архитектуры описанной выше биллинговой системы, не содержащей механизмов оптимизации, приведет к повышению эксплуатационных затрат после развертывания решения на платформе Windows Azure. В этом разделе описаны причины возможных дополнительных расходов.
В соответствии с определением сценария программное решение получает данные бизнес-транзакций через регулярные отрезки времени. Предположим, что это решение занято обработкой рабочей нагрузки только 25 % времени стандартного восьмичасового рабочего дня. В результате 6 часов (8 часов * 0,75) приходится на «время неактивности», когда система не занимается обработкой транзакций. Более того, решение вообще не получает данных в течение 16 часов каждые сутки во внерабочее время.
В течение периода неактивности, который в совокупности составляет 22 часа, решение пытается удалить рабочую информацию из очереди, не получая уведомлений о добавлении в нее новых данных. За это время каждый отдельный поток удаления из очереди выполняет до 79 200 транзакций (22 часа * 60 минут * 60 транзакций в минуту), связанных с входной очередью при интервале опроса по умолчанию, равного одной секунде.
Как указывалось выше, модель ценообразования услуг платформы Windows Azure использует в качестве базовой единицы отдельные «транзакции хранилища». Транзакцией хранилища называется запрос пользовательского приложения на добавление, чтение, обновление или удаление данных хранилища. На момент написания этого технического документа стоимость транзакций хранилища составляла 0,01 долл. США за 10 000 транзакций. UPDATE: на момент публикации перевода: 0,01 долл. США за 100 000 транзакций.
Важная информация. При вычислении количества транзакций, связанных с очередями, необходимо помнить о том, что размещение одного сообщения в очередь является одной транзакцией, в то время как получение сообщения часто является двухэтапным процессом, в который входит получение сообщения и запрос на удаление сообщения из очереди. В результате успешная операция по удалению сообщения из очереди потребует двух транзакций хранилища. Обратите внимание: даже если запрос на удаление сообщения из очереди не связан с получением данных, он все равно считается платной транзакцией.
Транзакции хранилища, создаваемые одним потоком удаления данных из очереди в описанном выше сценарии, добавят к ежемесячному счету за услуги приблизительно 2,38 долл. США (79 200/10 000 * 0,01 долл. США * 30 дней). 200 потоков удаления сообщений из очереди (или один поток в 200 экземплярах рабочей роли) увеличит ваши ежемесячные расходы на 457,2 долл. США (UPDATE: если производить расчеты на момент публикации перевода статьи, то это 45,7 долл. США). Эти затраты возникают, когда система не выполняет никаких вычислений, а только лишь проверяет наличие рабочих элементов в очереди. Приведенный выше пример является абстрактным, поскольку никто не будет реализовывать службу подобным образом. Необходимо использовать приведенные ниже приемы оптимизации.
Рекомендации по устранению излишних задержек
Для оптимизации производительности систем передачи сообщений на основе очередей Windows Azure можно использовать уровень обработки сообщений издателей и подписчиков, предоставляемой шиной интеграции Windows Azure, как описано ниже.
В этом случае разработчикам необходимо объединить механизмы опроса и принудительной отправки уведомлений в реальном времени, чтобы дать возможность слушателям подписываться на событие-уведомление (триггер), возникающее при определенных условиях и указывающее на размещение в очереди новой рабочей нагрузки. Такой подход позволяет создать стандартный цикл опроса очередей на уровне издателей и подписчиков для диспетчеризации уведомлений.
В сложных распределенных системах этот подход требует использования «шины сообщений» или «промежуточного ПО обработки сообщений» для надежной передачи уведомлений одному или нескольким подписчикам. Шина интеграции Windows Azure является оптимальным решением для обмена сообщениями между слабосвязанными службами распределенных приложений, развернутыми не только в среде Windows Azure, но и локально. Она идеально подходит для реализации архитектуры «шины сообщений», которая обеспечивает обмен уведомлениями между процессами передачи данных с помощью очередей.
Процедуры создания системы обмена сообщениями с помощью очередей могут использовать следующий шаблон:
Принципы, применяемые для обмена данными между экземплярами ролей Windows Azure при взаимодействии между издателями и подписчиками службы очередей, соответствуют большинству требований к обмену уведомлениями на основе принудительной отправки данных. Базовые понятия этого процесса рассмотрены в одной из наших предыдущих публикаций.
Важная информация. Использование шины интеграции Windows Azure регулируется схемой тарификации, учитывающей две важные составляющие этого процесса. Во-первых, взимается плата за получение и отправку данных при обмене с ЦОД. Во-вторых, плата взимается за количество соединений, установленных между приложением и инфраструктурой шины интеграции.
В связи с этим важно анализировать расходы и преимущества для оценки всех положительных и отрицательных сторон применения шины интеграции для реализации конкретной архитектуры. Следует оценить, приведет ли реализация уровня диспетчеризации уведомлений на базе шины интеграции к фактическому сокращению затрат, которое сможет оправдать инвестиции в этот проект и дополнительные трудозатраты разработчиков.
Негативное воздействие задержек можно достаточно легко минимизировать за счет создания дополнительного уровня обмена сообщениями между издателями и подписчиками. Дополнительное сокращение расходов обеспечивается с помощью динамического (эластичного) масштабирования, реализация которого описана в следующем разделе.
Рекомендации по динамическому масштабированию
Платформа Windows Azure поддерживает возможность простого и быстрого масштабирования решений заказчиков как в сторону увеличения, так и в сторону уменьшения. Возможность адаптации к колебаниям рабочей нагрузки и трафика является одним из основных преимуществ этой платформы облачных вычислений. Это означает, что понятие «масштабируемость» перестало быть термином в словаре ИТ-специалистов, а поддержка масштабируемости теперь не требует чрезмерных затрат. Программная реализация этой функции доступна в любом облачном решении с грамотно разработанной архитектурой.
Динамическое масштабирование — это техническая особенность конкретного решения, позволяющая адаптироваться к переменной рабочей нагрузке с помощью увеличения и уменьшения пространства хранения и вычислительных мощностей, доступных среде выполнения. Платформа Windows Azure содержит встроенную поддержку динамического масштабирования с помощью распределенной вычислительной инфраструктуры, позволяющей выделять потребителям необходимые мощности за установленную плату.
Важно различать два типа динамического масштабирования, поддерживаемых платформой Windows Azure:
- Масштабирование экземпляров ролей представляет собой добавление и удаление дополнительных экземпляров рабочих ролей или веб-ролей для обработки текущей рабочей нагрузки. Это часто требует изменения количества экземпляров в конфигурации службы. Среда выполнения Windows Azure реагирует на увеличение количества экземпляров, создавая новые экземпляры, а уменьшение количества экземпляров приводит к завершению работы некоторых из существующих экземпляров.
- Масштабирование процессов (потоков) представляет собой поддержание достаточного уровня мощностей (количества потоков обработки) для конкретного экземпляра роли за счет увеличения или уменьшения количества потоков в зависимости от текущей рабочей нагрузки.
Реализация динамического масштабирования в решении для обмена сообщениями на основе ролей требует учета следующих рекомендаций:
- Отслеживайте ключевые показатели производительности, в том числе использование центрального процессора, глубину очереди, время отклика и задержку при обработке сообщений.
- Динамически увеличивайте или уменьшайте количество экземпляров рабочих ролей для обработки пиковой рабочей нагрузки, как ожидаемой, так и непрогнозируемой.
- Программным путем увеличивайте и уменьшайте количество потоков обработки для адаптации системы к различным показателям рабочей нагрузки.
- Выполняйте разбиение рабочей нагрузки и параллельную обработку мелких фрагментов с помощью библиотеки Task Parallel Library платформы .NET Framework 4.
- Обеспечивайте наличие вычислительных мощностей при управлении решениями с непостоянным уровнем рабочей нагрузки; это позволит справляться с внезапным ростом нагрузки, не прилагая дополнительных усилий к созданию дополнительных экземпляров.
Интерфейсы API службы Service Management позволяют службам, размещенным на платформе Windows Azure, увеличивать или уменьшать количество запущенных экземпляров ролей за счет изменения конфигурации развертывания в среде выполнения.
Примечание. По умолчанию для стандартной подписки доступно не более 20 экземпляров вычислительных операций Windows Azure. Это позволяет оградить пользователей платформы Windows Azure от повышения стоимости обслуживания при случайной попытке создания очень большого количества экземпляров ролей. Это так называемое «мягкое» ограничение. Любые заявки на увеличение этой квоты необходимо подавать группе технической поддержки Windows Azure Support.
Динамическое масштабирование количества экземпляров ролей — не всегда лучший способ обработки резко возросшей рабочей нагрузки. Например, новому экземпляру виртуальной машины требуется несколько секунд для подготовки к работе, а в настоящее время в соглашениях об уровне обслуживания не предусмотрены показатели, связанные с длительностью этого процесса. Вместо этого можно пойти по более простому пути — увеличить количество рабочих потоков, чтобы справиться с временным возрастанием рабочей нагрузки. При обработке рабочей нагрузки происходит отслеживание ее показателей для выявления ситуаций, требующих динамического увеличения или уменьшения количества рабочих процессов.
Важная информация. В настоящее время целевое значение показателя масштабируемости для одной очереди Windows Azure ограничено 500 транзакциями в секунду. Если приложение пытается превысить это пороговое значение, например, выполняя операции над очередью с помощью нескольких экземпляров роли, для каждого из которых запущены сотни потоков удаления объектов из очереди, то служба хранилища может вернуть HTTP-ошибку 503 «Сервер занят». На случай появления этой ошибки в приложении должен быть реализован механизм повтора транзакций с помощью алгоритма с экспоненциально возрастающим временем задержки. Однако если ошибки HTTP 503 происходят регулярно, рекомендуется использовать несколько очередей и применять стратегию сегментирования, которая позволит использовать эти очереди для масштабирования рабочей нагрузки.
В большинстве случаев автоматизированное масштабирование рабочих процессов выполняется отдельным экземпляром роли. Масштабирование экземпляров роли часто требует разработки центрального элемента архитектуры решения, отслеживающего показатели производительности и принимающего меры по масштабированию системы. Приведенная ниже схема содержит описание компонента службы под названием Агент динамического масштабирования (Dynamic Scaling Agent), который собирает и анализирует данные, связанные с показателями рабочей нагрузки, для определения необходимости создания новых экземпляров или вывода из эксплуатации неактивных.
Службу агента масштабирования можно развернуть в качестве рабочей роли на платформе Windows Azure или в качестве локальной службы. Вне зависимости от применяемой топологии развертывания эта служба будет иметь доступ к очередям Windows Azure.
Обсудив влияние времени задержки на масштабирование, расходы на транзакции при обмене данными с хранилищем и требования к динамическому масштабированию, перейдем к рассмотрению практической реализации данных рекомендаций.
Техническая реализация
В предыдущих разделах были описаны основные отличительные особенности грамотно спроектированной архитектуры обмена сообщениями, реализованной с помощью служб хранения очередей Windows Azure. Мы рассмотрели три ключевых аспекта масштабирования: сокращение задержки при обработке данных, оптимизацию расходов на транзакции хранения данных и улучшение реагирования на нестабильность рабочей нагрузки.
Этот раздел предназначен для разработчиков приложений Windows Azure и содержит описание программной реализации шаблонов.
Примечание. В этом разделе содержатся сведения о создании слушателя очереди с поддержкой автоматического масштабирования, а также моделей на основе запросов и принудительной отправки данных. Для получения сведений о современных методах динамического масштабирования на уровне экземпляра роли обратитесь к проектам, реализованным сообществом пользователей и опубликованным на веб-сайте MSDN Code Gallery.
Создание стандартного слушателя очереди
Сначала создадим контракт, реализуемый компонентом слушателя очереди, который размещен в рабочей роли и ожидает передачи данных для очереди Windows Azure.
/// Создание контракта в расширении, отвечающем за прослушивание очереди Windows Azure. public interface ICloudQueueServiceWorkerRoleExtension { /// Запуск многопотокового слушателя очереди, который использует указанное количество потоков удаления сообщений из очереди. void StartListener(int threadCount); /// Возврат текущего состояния слушателя очереди для определения характеристик нагрузки в данный момент. CloudQueueListenerInfo QueryState(); /// Получение или указание размера пакета при выполнении операции удаления сообщений из очереди Windows Azure. int DequeueBatchSize { get; set; } /// Получение или указание интервала по умолчанию, от которого зависит время ожидания слушателя очереди между операциями опроса очереди. TimeSpan DequeueInterval { get; set; } /// Определение делегата обратного вызова, вызываемого, если очередь пуста. event WorkCompletedDelegate QueueEmpty; }
Событие QueueEmpty предназначено для использования узлом. Оно содержит механизм, позволяющий узлу управлять режимом работы слушателя очереди, когда очередь пуста. Делегат события определяется следующим образом:
/// <summary> /// Определение делегата обратного вызова, вызываемого при завершении обработки рабочего элемента и /// запросе обработчиком дополнительных инструкций относительно дальнейших шагов. /// </summary> /// <param name="sender">Источник события.</param> /// <param name="idleCount">Количество случаев, когда процесс выполнения неактивен.</param> /// <param name="delay">Время, в течение которого процесс находится в режиме сна перед обработкой следующего рабочего элемента.</param> /// <returns>Флаг, указывающий на необходимость остановить обработку рабочих элементов процессом выполнения и завершить работу самого процесса.</returns> public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);
Обработку элементов очереди можно упростить, если создать слушателей с поддержкой универсальных шаблонов вместо использования встроенных в SDK классов, таких как CloudQueueMessage. Создадим новый интерфейс слушателя очереди с поддержкой универсальных шаблонов доступа к очередям:
/// <summary> /// Создание контракта для поддержки расширения, реализующего слушатель очереди на основе универсальных шаблонов. /// </summary> /// <typeparam name="T">Тип данных объектов очереди, который будет обрабатываться слушателем очереди.</typeparam> public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T> { }
Обратите внимание: мы также разрешили слушателю с поддержкой универсальных шаблонов передавать элементы очереди одному или нескольким подписчикам, реализовав шаблон проектирования «Наблюдатель» (Observer) с помощью интерфейса IObservable, доступного в среде .NET Framework 4.
Мы намерены сохранить один экземпляр компонента, в котором реализован интерфейс ICloudQueueListenerExtension. Однако нам необходима возможность одновременного запуска нескольких потоков (задач) по удалению сообщений из очереди. Поэтому мы добавляем в компонент слушателя поддержку многопотоковой логики удаления сообщений из очереди. Для решения этой задачи используется библиотека функций параллельной обработки данных Task Parallel Library (TPL). Метод StartListener позволит нам создать нужное количество потоков для удаления сообщений из очереди:
/// <summary> /// Запуск нужного количества задач удаления сообщений из очереди. /// </summary> /// <param name="threadCount">Количество задач удаления сообщений из очереди.</param> public void StartListener(int threadCount) { Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount"); // При каждом вызове этого метода необходимо сбрасывать коллекцию задач удаления сообщений из очереди. if (this.dequeueTasks.IsAddingCompleted) { this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList); } for (int i = 0; i < threadCount; i++) { CancellationToken cancellationToken = this.cancellationSignal.Token; CloudQueueListenerDequeueTaskState<T> workerState = new CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage); // Запуск новой задачи удаления сообщений из очереди и регистрация ее в коллекции задач, внутреннее управление которой производится этим компонентом. this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)); } // Установка на коллекцию метки, запрещающей добавление дополнительных элементов. this.dequeueTasks.CompleteAdding(); }
Метод DequeueTaskMain реализует функции потока, который удаляет сообщения из очереди. Он поддерживает следующие основные операции:
/// <summary> /// Реализация задачи по удалению сообщений из очереди Windows Azure. /// </summary> /// <param name="state">Объект, содержащий данные для задачи.</param> private void DequeueTaskMain(object state) { CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state; int idleStateCount = 0; TimeSpan sleepInterval = DequeueInterval; try { // Выполнение задачи по удалению сообщений из очереди до тех пор, пока не будет получена команда завершения работы или не произойдет выход из цикла. while (workerState.CanRun) { try { var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg; int messageCount = 0; // Параллельная обработка удаленных из очереди сообщений с помощью указанного выше запроса PLINQ. queueMessages.ForAll((message) => { // Сброс счетчика итераций неактивного состояния. idleStateCount = 0; // Уведомление всех подписчиков о появлении нового сообщения, которое необходимо обработать. workerState.OnNext(message); // Если обработка завершена успешно, удалить обработанное сообщение из очереди. workerState.QueueStorage.Delete<T>(message); // Увеличить счетчик количества обработанных сообщений на единицу. messageCount++; }); // Проверить, выполнялась ли работа в ходе этой итерации. if (0 == messageCount) { // Увеличить на единицу счетчик итераций, в ходе которых полезная работа не выполнялась (например, сообщения не удалялись из очереди). idleStateCount++; // Вызов определенного пользователем делегата, который сообщает об отсутствии рабочей нагрузки. if (QueueEmpty != null) { // Проверить, затребовал ли определенный пользователем делегат прерывание дальнейшей обработки рабочей нагрузки. if (QueueEmpty(this, idleStateCount, out sleepInterval)) { // Прерывание цикла удаления сообщений из очереди, если этого потребовал определенный пользователем делегат. break; } } // Переход в состояние простоя на указанный период времени. Thread.Sleep(sleepInterval); } } catch (Exception ex) { if (ex is OperationCanceledException) { throw; } else { // Передача внешнему объекту функций обработки ошибок и создания отчетов об ошибках. workerState.OnError(ex); // Переход в режим сна на указанный интервал времени, позволяющий избежать создания чрезмерно большого количества сообщений об ошибках. Thread.Sleep(sleepInterval); } } } } finally { workerState.OnCompleted(); } }
Необходимо сделать несколько пояснений, связанных с особенностями реализации метода DequeueTaskMain.
Во-первых, для диспетчеризации сообщений с целью последующей обработки используется метод Parallel LINQ (PLINQ).
Основным преимуществом применения PLINQ для решения этой задачи является ускорение обработки сообщений благодаря параллельному использованию делегата в отдельных рабочих потоках на различных процессорах всякий раз, когда это возможно.
Примечание. Внутреннее управление параллелизацией запросов обеспечивает PLINQ. Невозможно гарантировать, что система PLINQ будет использовать более одного ядра для поддержки параллелизации. Если система PLINQ обнаруживает вероятность замедленного выполнения запроса из-за дополнительных затрат вычислительных мощностей на параллелизацию, запрос может быть выполнен последовательно. Для реализации всех преимуществ PLINQ совокупная рабочая нагрузка при выполнении запроса должна быть достаточно большой, чтобы оправдать использование дополнительных вычислительных мощностей на управление пулом потоков.
Во-вторых, мы не создаем отдельные запросы для получения каждого конкретного сообщения. Вместо этого используются API-интерфейсы службы Queue Service API для получения указанного количества сообщений из очереди. Количество получаемых сообщений определяется параметром DequeueBatchSize, который передается методу Get. При обращении к уровню абстракции хранилища данных этот параметр передается API службы очередей. Кроме того, проводится проверка безопасности, гарантирующая, что размер пакета не превышает максимально допустимый для API. Ниже приведена программная реализация этого подхода:
/// Этот класс содержит надежные средства доступа к хранилищу очередей Windows Azure с поддержкой универсальных шаблонов. public sealed class ReliableCloudQueueStorage : ICloudQueueStorage { /// Максимальный размер пакета, поддерживаемый методами API службы Queue Service при выполнении одной операции Get. private const int MaxDequeueMessageCount = 32; /// Получение коллекции сообщений из указанной очереди и установка времени ожидания между отправкой запросов. public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout) { Guard.ArgumentNotNullOrEmptyString(queueName, "queueName"); Guard.ArgumentNotZeroOrNegativeValue(count, "count"); try { var queue = this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName)); IEnumerable<CloudQueueMessage> queueMessages = this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() => { return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout); }); // ... Далее идет дополнительный код ...
В заключение отметим, что мы не намерены выполнять задание по удалению сообщений из очереди бесконечно. Мы создали явно заданную точку проверки, реализованную в виде события QueueEmpty, которое вызывается каждый раз, когда очередь становится пустой. В этом случае вызывается обработчик события QueueEmpty, который определяет, можно ли завершить выполняемую задачу по удалению сообщений из очереди. Правильная реализация обработчика событий QueueEmpty обеспечивает поддержку «автоматического сокращения», которое рассматривается в следующем разделе.
Автоматическое сокращение объема задач по удалению сообщений из очереди
Обработчик событий QueueEmpty позволяет решать две категории задач. Во-первых, он обеспечивает отправку сообщений исходному заданию по удалению сообщений из очереди, передавая команду перейти в режим сна на заданный интервал времени (указанный в выходном параметре delay делегата события). Во-вторых, при помощи передаваемого логического параметра он сообщает заданию по удалению сообщений из очереди о необходимости завершения работы.
Следующая реализация обработчика событий QueueEmpty позволяет решить обе задачи, рассмотренные выше. Обработчик вычисляет интервал между запросами и сообщает заданию по удалению сообщений из очереди о необходимости экспоненциального увеличения задержки между двумя последовательными опросами. Обратите внимание: в этом решении задержка между запросами не будет превышать одной секунды; если автоматическое масштабирование реализовано правильно, отпадает необходимость в большой задержке между операциями опроса. Кроме того, обработчик событий запрашивает статус слушателя очереди для определения количества активных задач по удалению сообщений из очереди. Если это количество превышает единицу, обработчик событий рекомендует исходному заданию по удалению сообщений из очереди завершить цикл опроса при достижении максимального значения интервала ожидания между запросами. В противном случае задание по удалению сообщений из очереди не будет прервано, что позволит оставить только один поток запроса, единовременно запущенный для каждого экземпляра слушателя очереди. Этот подход помогает сократить количество транзакций хранилища и уменьшить расходы на оплату транзакций, о чем говорилось выше.
private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay) { // Отправитель запроса является экземпляром ICloudQueueServiceWorkerRoleExtension, можно безопасно выполнить преобразование типов. ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension; // Сбор сведений о том, какое расширение отвечает за получение параметров конфигурации рабочей роли. IWorkItemProcessorConfigurationExtension config = Extensions.Find<IWorkItemProcessorConfigurationExtension>(); // Получение текущего состояния слушателя очереди для определения характеристик рабочей нагрузки в текущий момент времени. CloudQueueListenerInfo queueServiceState = queueService.QueryState(); // Настройка внутренних параметров, чтение настроек конфигурации. int deltaBackoffMs = 100; int minimumIdleIntervalMs = Convert.ToInt32(config.Settings.MinimumIdleInterval.TotalMilliseconds); int maximumIdleIntervalMs = Convert.ToInt32(config.Settings.MaximumIdleInterval.TotalMilliseconds); // Вычисление нового интервала ожидания в режиме сна после экспоненциального увеличения задержки между двумя последовательными запросами. int delta = (int)((Math.Pow(2.0, (double)idleCount) - 1.0) * (new Random()).Next((int)(deltaBackoffMs * 0.8), (int)(deltaBackoffMs * 1.2))); int interval = Math.Min(minimumIdleIntervalMs + delta, maximumIdleIntervalMs); // Передача вычисленного интервала заданию удаления сообщений из очереди для перехода в режим сна на указанный период времени. delay = TimeSpan.FromMilliseconds((double)interval); // При достижении максимального значения интервала сообщить заданию удаления сообщений из очереди о необходимости завершения работы, // если это задание удаления сообщений из очереди не является последним. Если оно является последним, мы завершим его работу и продолжим опрашивать очередь. return delay.TotalMilliseconds >= maximumIdleIntervalMs && queueServiceState.ActiveDequeueTasks > 1; }
Механизм автоматизированного масштабирования можно описать следующим образом:
- Как только в очереди появляется сообщение, задачи по удалению сообщений из очереди сразу же обрабатывают рабочую нагрузку. Запросы на удаление пакетов сообщений из очереди передаются без задержек.
- После того как исходная очередь опустеет, каждая из задач удаления сообщений из очереди вызывает событие QueueEmpty.
- Обработчик события QueueEmpty вычисляет случайную экспоненциальную задержку между отправкой запросов и передает задаче удаления сообщений из очереди команду на приостановку операций в течение заданного интервала.
- Задачи удаления сообщений из очереди продолжают опрашивать исходную очередь через заданные случайные интервалы до тех пор, пока совокупное время неактивности не превысит допустимый максимум.
- Если максимальная продолжительность интервала неактивности достигнута, а исходная очередь все еще пуста, все активные задачи удаления сообщений из очереди завершают работу. Это происходит не одновременно, поскольку алгоритм вычисления интервалов между отправкой сообщений срабатывает в разное время.
- Наступает момент, когда активной остается только одна задача удаления сообщений из очереди. В результате прекращается обработка транзакций, связанных с опросом пустой очереди, кроме тех, которые порождаются этой оставшейся задачей.
Для подробного изучения механизма сбора информации о текущем уровне рабочей нагрузки перейдем к рассмотрению соответствующих артефактов исходного кода. Во-первых, существует структура для хранения показателей, отвечающих за измерение результирующей рабочей нагрузки на систему. В качестве простого примера приведем небольшое подмножество показателей для использования в образцах программного кода.
/// Реализация структуры, содержащей характеристики текущей рабочей нагрузки на слушателя очереди. public struct CloudQueueListenerInfo { /// Возвращает приблизительное количество элементов в очереди Windows Azure. public int CurrentQueueDepth { get; internal set; } /// Возвращает количество задач удаления элементов из очереди, которые активно выполняют работу или ожидают ее. public int ActiveDequeueTasks { get; internal set; } /// Возвращает максимальное количество задач по удалению элементов из очереди, выполнявшихся одновременно. public int TotalDequeueTasks { get; internal set; } }
Во-вторых, слушатель очереди использует метод, возвращающий его показатели нагрузки (см. пример ниже).
/// Возврат текущего состояния слушателя очереди для определения характеристик нагрузки в данный момент. public CloudQueueListenerInfo QueryState() { return new CloudQueueListenerInfo() { CurrentQueueDepth = this.queueStorage.GetCount(this.queueLocation.QueueName), ActiveDequeueTasks = (from task in this.dequeueTasks where task.Status != TaskStatus.Canceled && task.Status != TaskStatus.Faulted && task.Status != TaskStatus.RanToCompletion select task).Count(), TotalDequeueTasks = this.dequeueTasks.Count }; }
Автоматическое масштабирование задач удаления из очереди
В предыдущем разделе была описана функция сокращения количества активных задач удаления из очереди до одного экземпляра с целью уменьшить влияние транзакций в режиме неактивности на рост расходов, связанных с операциями обмена данными с хранилищем. В этом разделе приведен пример реализации функции автоматического масштабирования для наращивания вычислительной мощности.
Создадим делегата для отслеживания состояний перехода от пустой очереди к непустой с целью запуска событий, например автоматического масштабирования:
/// <summary> /// Создание делегата, вызываемого при поступлении нового рабочего задания в очередь, когда слушатель очереди неактивен. /// </summary> /// <param name="sender">Источник события.</param> public delegate void WorkDetectedDelegate(object sender); Расширим первоначальное определение интерфейса ICloudQueueServiceWorkerRoleExtension, чтобы включить в него новое событие, вызываемое каждый раз, когда слушатель очереди обнаруживает новые рабочие элементы (в этом случае глубина очереди принимает положительное значение): public interface ICloudQueueServiceWorkerRoleExtension { // ... Прочие элементы интерфейса пропущены, чтобы сократить текст. См. предыдущие фрагменты кода для получения дополнительной информации ... /// Создание делегата, вызываемого при поступлении нового рабочего задания в очередь, когда слушатель очереди неактивен. event WorkDetectedDelegate QueueWorkDetected; }
Определим, в какой строке программного кода слушателя очереди должно вызываться событие. В нашем случае событие QueueWorkDetected будет вызвано из цикла удаления из очереди, реализованного с помощью метода DequeueTaskMain, который должен быть расширен следующим образом:
public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T> { // Экземпляр делегата, вызываемого при поступлении нового рабочего задания в очередь, когда слушатель очереди неактивен. public event WorkDetectedDelegate QueueWorkDetected; private void DequeueTaskMain(object state) { CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state; int idleStateCount = 0; TimeSpan sleepInterval = DequeueInterval; try { // Выполнение задачи по удалению сообщений из очереди до тех пор, пока не будет получена команда завершения работы или не произойдет выход из цикла. while (workerState.CanRun) { try { var queueMessages = from msg in workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg; int messageCount = 0; // Проверка поступления рабочих элементов в очередь, когда слушатель неактивен. if (idleStateCount > 0 && queueMessages.Count() > 0) { if (QueueWorkDetected != null) { QueueWorkDetected(this); } } // ... Прочий программный код пропущен, чтобы сократить текст. См. предыдущие фрагменты кода для получения дополнительной информации ...
На последнем этапе создадим обработчик события QueueWorkDetected. Реализация этого обработчика события будет содержаться в компоненте, создающем экземпляр слушателя очереди. В нашем случае этим компонентом является рабочая роль. Программный код, отвечающий за реализацию обработчика события и создание его экземпляра, состоит из следующих частей:
public class WorkItemProcessorWorkerRole : RoleEntryPoint { // Вызов Windows Azure для инициализации экземпляра роли. public override sealed bool OnStart() { // ... Перед этим вставлен дополнительный код ... // Создание экземпляра слушателя роли для входной очереди. var inputQueueListener = new CloudQueueListenerExtension<XDocument>(inputQueueLocation); // Настройка слушателя входной очереди. inputQueueListener.QueueEmpty += HandleQueueEmptyEvent; inputQueueListener.QueueWorkDetected += HandleQueueWorkDetectedEvent; inputQueueListener.DequeueBatchSize = configSettingsExtension.Settings.DequeueBatchSize; inputQueueListener.DequeueInterval = configSettingsExtension.Settings.MinimumIdleInterval; // ... Далее идет дополнительный код ... } /// Создание делегата, вызываемого при поступлении нового рабочего задания в очередь, когда слушатель очереди неактивен. private void HandleQueueWorkDetectedEvent(object sender) { // Отправитель запроса является экземпляром ICloudQueueServiceWorkerRoleExtension, можно безопасно выполнить преобразование типов. ICloudQueueServiceWorkerRoleExtension queueService = sender as ICloudQueueServiceWorkerRoleExtension; // Получение текущего состояния слушателя очереди для определения характеристик рабочей нагрузки в текущий момент времени. CloudQueueListenerInfo queueServiceState = queueService.QueryState(); // Определение количества задач для обработки рабочей нагрузки в очереди, с учетом ее текущей глубины. int dequeueTaskCount = GetOptimalDequeueTaskCount(queueServiceState.CurrentQueueDepth); // Если количество задач удаления из очереди меньше вычисленного с помощью приведенного выше кода, то выполнить запуск необходимого количества задач удаления из очереди. if (queueServiceState.ActiveDequeueTasks < dequeueTaskCount) { // Запустить необходимое количество задач удаления из очереди. queueService.StartListener(dequeueTaskCount - queueServiceState.ActiveDequeueTasks); } } // ... Далее идет дополнительный код ...
В свете приведенного выше примера необходимо пояснить использование метода GetOptimalDequeueTaskCount. Этот метод отвечает за вычисление количества задач удаления из очереди для обработки рабочей нагрузки. При вызове необходимо определить (с помощью любых подходящих механизмов принятия решений), какой объем вычислительных мощностей требуется слушателю очереди для обработки ожидаемой рабочей нагрузки.
Например, разработчик может пойти по наиболее простому пути и реализовать набор статичных правил прямо в методе GetOptimalDequeueTaskCount. Опираясь на известные параметры пропускной способности и масштабируемости инфраструктуры обработки очередей, среднее время задержки, объем обрабатываемых данных и прочие сведения, этот набор правил может дать оптимистическую оценку необходимого количества задач удаления из очереди и принять решение об их увеличении.
В следующем примере используется упрощенный способ определения оптимального количества задач удаления из очереди:
/// <summary> /// Возвращает количество задач для обработки рабочей нагрузки в очереди, с учетом ее текущей глубины. /// </summary> /// <param name="currentDepth">Приблизительное количество элементов в очереди.</param> /// <returns>Оптимальное количество задач удаления из очереди.</returns> private int GetOptimalDequeueTaskCount(int currentDepth) { if (currentDepth < 100) return 10; if (currentDepth >= 100 && currentDepth < 1000) return 50; if (currentDepth >= 1000) return 100; // Возврат минимально приемлемого количества. return 1; }
Приведенный выше пример программного кода не может служить универсальным решением. Более близкое к идеалу решение — вызывать правило, поддерживающее настройку и управление извне и способное выполнить все необходимые вычисления.
На текущий момент существует рабочий прототип слушателя очереди, способный автоматически масштабироваться в соответствии с колебаниями рабочей нагрузки (увеличиваться или уменьшаться). Возможно, его функции следует расширить для адаптации к колебаниям рабочей нагрузки в ходе ее обработки. Эту функцию можно добавить, применив тот же шаблон, который использовался для добавления поддержки события QueueWorkDetected.
Перейдем к рассмотрению еще одного важного способа оптимизации для сокращения времени задержки при работе слушателей очередей.
Реализация уровня издателя и подписчика для удаления из очереди с нулевой задержкой
В этом разделе мы дополним приведенную выше реализацию слушателя очереди механизмом принудительной отправки уведомлений, созданным на основе однонаправленной многоадресной рассылки для шины интеграции. Этот механизм уведомлений обрабатывает событие-триггер, которое является сигналом для слушателя очереди о необходимости начать работу по удалению элементов из очереди. Такой подход позволяет отказаться от опроса очереди при проверке наличия новых сообщений и устраняет действие фактора задержки.
Создадим событие-триггер, получаемое слушателем очереди при появлении в ней новой рабочей нагрузки:
/// Реализация события-триггера, которое указывает на размещение новой рабочей нагрузки в очереди. [DataContract(Namespace = WellKnownNamespace.DataContracts.Infrastructure)] public class CloudQueueWorkDetectedTriggerEvent { /// Возвращает имя учетной записи хранилища, в которой размещена очередь. [DataMember] public string StorageAccount { get; private set; } /// Возвращает имя очереди, в которой размещена рабочая нагрузка. [DataMember] public string QueueName { get; private set; } /// Возвращает объем рабочей нагрузки очереди (например, размер сообщения или количество сообщений в пакете). [DataMember] public long PayloadSize { get; private set; } // ... Конструктор пропущен для краткости изложения ... }
Разрешим реализациям слушателя очереди выступать в роли подписчиков, получающих событие-триггер. Первый шаг — назначение слушателя очереди в качестве наблюдателя события CloudQueueWorkDetectedTriggerEvent:
/// Создание контракта в расширении, отвечающем за прослушивание очереди Windows Azure. public interface ICloudQueueServiceWorkerRoleExtension : IObserver<CloudQueueWorkDetectedTriggerEvent> { // ... Текст пропущен, т. к. уже приведен в предыдущих примерах ... }
Вторым этапом будет реализация метода OnNext, определенного в интерфейсе IObserver. Этот метод вызывается поставщиком с целью уведомления наблюдателя о новом событии:
public class CloudQueueListenerExtension<T> : ICloudQueueListenerExtension<T> { // ... Перед этим вставлен дополнительный код ... /// <summary> /// Вызывается поставщиком для уведомления слушателя очереди о новом событии /// </summary> /// <param name="e">Событие-триггер, указывающее на размещение новой рабочей нагрузки в очереди.</param> public void OnNext(CloudQueueWorkDetectedTriggerEvent e) { Guard.ArgumentNotNull(e, "e"); // Проверить, что событие-триггер относится к очереди, управляемой слушателем; игнорировать события, привязанные к другой очереди. if (this.queueLocation.StorageAccount == e.StorageAccount && this.queueLocation.QueueName == e.QueueName) { if (QueueWorkDetected != null) { QueueWorkDetected(this); } } } // ... Далее идет дополнительный код ... }
Как видно из предыдущего примера, мы намеренно применяем один и тот же делегат, использованный на предыдущих этапах. Обработчик событий QueueWorkDetected обеспечивает логику приложения для создания оптимального количества экземпляров задач удаления из очереди. Поэтому мы повторно используем этот же обработчик событий при обработке уведомления CloudQueueWorkDetectedTriggerEvent.
Как отмечалось в предыдущих разделах, при использовании принудительной отправки уведомлений необязательно поддерживать постоянно выполняемую задачу удаления из очереди. Это позволяет свести к нулю количество задач по обработке очереди, которые приходятся на каждый экземпляр слушателя очереди, а также использовать механизм уведомлений для создания экземпляров задач удаления из очереди, когда очередь получает рабочие элементы. Чтобы убедиться в отсутствии неактивных задач удаления из очереди, достаточно внести простое изменение в обработчик событий QueueEmpty:
private bool HandleQueueEmptyEvent(object sender, int idleCount, out TimeSpan delay) { // ... Перед этим вставлен дополнительный код ... // При достижении максимального значения интервала сообщить заданию удаления сообщений из очереди о необходимости завершения работы. return delay.TotalMilliseconds >= maximumIdleIntervalMs; }
Таким образом, мы больше не определяем, существует ли одна активная задача удаления из очереди. Модифицированный обработчик событий QueueEmpty учитывает только факт превышения максимального интервала неактивности, по истечении которого будут завершены все активные задачи удаления из очереди.
Для получения уведомлений CloudQueueWorkDetectedTriggerEvent используется модель издателей и подписчиков, которая реализована в виде слабосвязанной модели обмена сообщениями между экземплярами ролей Windows Azure. В сущности, мы задействуем уровень обмена данными между ролями и обрабатываем входящие события следующим образом:
public class InterRoleEventSubscriberExtension : IInterRoleEventSubscriberExtension { // ... Часть кода пропущена для краткости изложения. Для получения справочной информации см. руководство, опубликованное в блоге команды Windows Azure Customer Advisory Team ... public void OnNext(InterRoleCommunicationEvent e) { if (this.owner != null && e.Payload != null) { // ... Перед этим вставлен дополнительный код ... if (e.Payload is CloudQueueWorkDetectedTriggerEvent) { HandleQueueWorkDetectedTriggerEvent(e.Payload as CloudQueueWorkDetectedTriggerEvent); return; } // ... Далее идет дополнительный код ... } } private void HandleQueueWorkDetectedTriggerEvent(CloudQueueWorkDetectedTriggerEvent e) { Guard.ArgumentNotNull(e, "e"); // Перечисление зарегистрированных слушателей очереди и передача им события-триггера. foreach (var queueService in this.owner.Extensions.FindAll<ICloudQueueServiceWorkerRoleExtension>()) { // Передача события-триггера прослушивателю очереди. queueService.OnNext(e); } } }
Многоадресная рассылка события-триггера, определенная в классе CloudQueueWorkDetectedTriggerEvent, является обязанностью издателя, а именно компонента, который размещает рабочие элементы в очереди. Это событие может запускаться перед отправкой в очередь самого первого рабочего элемента или после размещения в очереди последнего рабочего элемента. В следующем примере мы публикуем событие-триггер после того, как завершили размещение всех рабочих элементов во входную очередь:
public class ProcessInitiatorWorkerRole : RoleEntryPoint { // Экземпляр расширения роли, предоставляющий интерфейс службы обмена данными между ролями. private volatile IInterRoleCommunicationExtension interRoleCommunicator; // ... Часть кода пропущена для краткости изложения. Для получения справочной информации см. руководство, опубликованное в блоге команды Windows Azure Customer Advisory Team ... private void HandleWorkload() { // Шаг 1. Получение рабочей нагрузки, требующей использования больших вычислительных мощностей. // ... (код пропущен для краткости изложения) ... // Шаг 2. Размещение рабочих элементов во входной очереди. // ... (код пропущен для краткости изложения) ... // Шаг 3. Уведомление слушателей очереди об ожидаемом получении работы. // Создание события-триггера, указывающего на очередь, в которой мы только что разместили рабочие элементы. var trigger = new CloudQueueWorkDetectedTriggerEvent("MyStorageAccount", "InputQueue"); //Упаковка триггера в событие обмена данными между ролями. var interRoleEvent = new InterRoleCommunicationEvent(CloudEnvironment.CurrentRoleInstanceId, trigger); // Публикация события обмена данными между ролями с помощью однонаправленной многоадресной рассылки шины интеграции. interRoleCommunicator.Publish(interRoleEvent); } }
Итак, мы создали слушатель очереди, который поддерживает многопотоковую обработку, автоматическое масштабирование и принудительную отправку уведомлений. Пришло время объединить все рекомендации, относящиеся к проектированию решений по обмену сообщениями на основе очередей для платформы Windows Azure.
Заключение
Для максимального повышения эффективности и соотношения «цена — качество» решений по обмену сообщениями на основе очередей Windows Azure, архитекторы и разработчики должны следовать приведенным ниже рекомендациям.
Архитекторы решений должны учитывать следующее:
- При создании архитектуры обмена сообщениями на основе очередей необходимо использовать службу хранения очередей Windows Azure для асинхронного и масштабируемого обмена сообщениями между уровнями и службами в облачных и гибридных решениях.
- Рекомендуется использовать сегментированную архитектуру обмена сообщениями, способную обеспечить масштабирование для поддержки более 500 транзакций в секунду.
- Необходимо понимать базовые принципы ценообразования услуг платформы Windows Azure и оптимизировать решения с целью снижения расходов на оплату транзакций; для этого можно использовать рекомендации и шаблоны проектирования.
- Если к архитектуре предъявляется требование динамического масштабирования, при ее разработке необходимо предусмотреть возможность адаптации к изменению рабочей нагрузки.
- Следует использовать правильные приемы и методы поддержки автоматического масштабирования для эластичного увеличения и сокращения вычислительных мощностей с целью сокращения эксплуатационных расходов.
- Прежде чем использовать шину интеграции для подготовки к отправке уведомлений в реальном времени, необходимо оценить преимущества и затраты.
Разработчики должны учитывать следующее:
- Проектирование решения для обмена сообщениями требует реализации пакетной обработки при написании процедур хранения и извлечения данных из очередей Windows Azure.
- Реализация эффективной службы прослушивания очередей требует создания механизма, опрашивающего пустые очереди с помощью одного потока для удаления элементов из очередей.
- Используйте динамическое масштабирование для сокращения числа экземпляров рабочих ролей, если очереди остаются пустыми длительное время.
- Реализуйте случайную экспоненциальную величину задержки между отправкой запросов, чтобы снизить расходы на оплату транзакций при опросе неактивных очередей.
- Применяете методы, препятствующие выходу показателей масштабируемости за указанные пределы при реализации издателей и потребителей очередей, создающих многочисленные экземпляры и использующих многопотоковую обработку.
- При размещении данных в очереди Windows Azure и получении данных из очередей задействуйте отказоустойчивую платформу для повторной отправки сообщений, способную работать в неустойчивом состоянии.
- Используйте функцию однонаправленной передачи событий шины интеграции Windows Azure для поддержки принудительной отправки уведомлений, позволяющих уменьшить время задержки и повысить производительность решения для обмена сообщениями на основе очередей.
- Пользуйтесь новыми возможностями платформы .NET Framework 4, такими как TPL, PLINQ и шаблон проектирования «Наблюдатель», чтобы повысить степень параллельности обработки, улучшить одновременность обработки потоков и упростить проектирование многопотоковых служб.
ссылка на оригинал статьи http://habrahabr.ru/post/155961/
Добавить комментарий