- в документации сказано, что Amazon старается сохранять порядок сообщений, на сколько хорошо он сохраняется?
- как быстро происходит получение сообщения при использовании Long Polling?
- насколько ускоряет процесс пакетная обработка?
Постановка задачи
Самая поддерживаемая библиотека для AWS на erlang это erlcloud [1], для инициализации библиотеки достаточно вызвать методы start и configure, как это и указано на gihub. Мои сообщения будут содержать набор случайных символов, генерируемые следующей функцией:
random_string(0) -> []; random_string(Length) -> [random_char() | random_string(Length-1)]. random_char() -> random:uniform(95) + 31 .
для замеров скорости воспользуемся известной функцией, использующей timer:tc, но с некоторыми изменениями:
test_avg(M, F, A, R, N) when N > 0 -> {Ret, L} = test_loop(M, F, A, R, N, []), Length = length(L), Min = lists:min(L), Max = lists:max(L), Med = lists:nth(round((Length / 2)), lists:sort(L)), Avg = round(lists:foldl(fun(X, Sum) -> X + Sum end, 0, L) / Length), io:format("Range: ~b - ~b mics~n" "Median: ~b mics~n" "Average: ~b mics~n", [Min, Max, Med, Avg]), Ret. test_loop(_M, _F, _A, R, 0, List) -> {R, List}; test_loop(M, F, A, R, N, List) -> {T, Result} = timer:tc(M, F, [R|A]), test_loop(M, F, A, Result, N - 1, [T|List]).
изменения касаются вызова тестируемой функции, в этом варианте я добавил аргумент R, который позволяет использовать значение, возвращаемое на предыдущем запуске, это нужно для того, чтобы генерировать номера сообщений и собирать дополнительную информацию относительно перемешивания при получении сообщения. Таким образом функция отправки сообщения с номером будет выглядеть следующим образом:
send_random(N, Queue) -> erlcloud_sqs:send_message(Queue, [N + 1 | random_string(6000 + random:uniform(6000))]), N + 1 .
А её вызов со сбором статистики:
test_avg(?MODULE, send_random, [QueueName], 31, 20)
здесь 31 — номер первого сообщения, число это выбрано не случайно, дело в том, что erlang не слишком хорошо различает последовательности чисел и строки и в сообщении это будет символ номер 31, меньшие номера можно передавать в SQS, но непрерывные диапазоны в этом случае получаются небольшие (#x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] | [#x10000 to #x10FFFF], подробней [2]) и при выходе из допустимого диапазона вы получите исключение. Таким образом функция send_random генерирует и отправляет сообщение в очередь с именем Queue, в начале которого находится число, определяющее его номер, функция возвращает номер следующего числа, которое используется далее следующей функцией генерации. Функция test_avg принимает QueueName, которое становится вторым аргументом функции send_random, первый аргумент — номер и количество повторений.
Функция, которая будет получать сообщения и проверять их порядок будет выглядеть следующим образом:
checkorder(N, []) -> N; checkorder(N, [H | T]) -> [{body, [M | _]}|_] = H, K = if M > N -> M; true -> io:format("Wrong ~b less than ~b~n", [M, N]), N end, checkorder(K, T). receive_checkorder(LastN, Queue) -> [{messages, List} | _] = erlcloud_sqs:receive_message(Queue), remove_list(Queue, List), checkorder(LastN, List).
Удаление сообщений:
remove_msg(_, []) -> wrong; remove_msg(Q, [{receipt_handle, Handle} | _]) -> erlcloud_sqs:delete_message(Q, Handle); remove_msg(Q, [_ | T]) -> remove_msg(Q, T). remove_list(_, []) -> ok; remove_list(Q, [H | T]) -> remove_msg(Q, H), remove_list(Q, T).
в списке, передаваемом на удаление содержится много лишней информации (тело сообщения и т.д.), функция удаления находит receipt_handle, который требуется для формирования запроса или возвращает wrong в случае если receipt_handle не найден
Перемешивание сообщений
Забегая вперёд могу сказать, что даже на небольшом количестве сообщений перемешивание оказалось довольно существенным и возникла дополнительная задача: нужно оценить степень перемешивания. К сожалению хороших критериев обнаружить не удалось и решено было выводить максимальное и среднее расхождение с правильной позицией. Зная размер такого окна можно восстанавливать порядо сообщений при получении, при этом конечно, ухудшается скорость обработки.
Для вычисления такой разницы достаточно поменять одну лишь функцию проверки порядка сообщений:
checkorder(N, []) -> N; checkorder({N, Cnt, Sum, Max}, [H | T]) -> [{body, [M | _]}|_] = H, {N1, Cnt1, Sum1, Max1} = if M < N -> {N, Cnt + 1, Sum + N - M, if Max < N - M -> N - M; true -> Max end }; true -> {M, Cnt, Sum, Max} end, checkorder({N1, Cnt1, Sum1, Max1}, T).
вызов функции выполнения серии будет выглядеть следующим образом:
{_, Cnt, Sum, Max} = test_avg(?MODULE, receive_checkorder, [QueueName], {0, 0, 0, 0}, Size)
я получаю количество элементов, которые пришли позже, чем нужно, сумму их расстояний от наибольшего из полученных элементов и максимальное смещение. Самое интересное для меня здесь это максимальное смещение, остальные характеристики можно назвать спорными и они возможно не слишком удачно вычисляются (к примеру, если один элемент считывается раньше, то все элементы, которые должны идти до него будут считаться переставленными в этом случае). К результатам:
Размер (шт) | 20 | 50 | 100 | 150 | 200 | 250 | 300 | 400 | 500 | 600 | 700 | 800 | 900 | 1000 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Максимальное смещение (шт) | 11 | 32 | 66 | 93 | 65 | 139 | 184 | 155 | 251 | 241 | 218 | 249 | 359 | 227 |
Среднее смещение (шт) | 5.3 | 10.5 | 23.9 | 43 | 25.6 | 45.9 | 48.4 | 65.6 | 74.2 | 74.2 | 78.3 | 72.3 | 110.8 | 82.8 |
Первая строка — количество сообщений в очереди, второе — максимальное смещение, третье — среднее смещение.
Результаты меня удивили, сообщения не просто перемешиваются, этому просто нет границ, то есть с увеличением количества сообщений нужно увеличивать размер просматриваемого окна. То же самое в виде графика:
Long Polling
Как я уже писал, Amazon SQS не поддерживает подписки, для этого можно использовать Amazon SNS, но если требуются быстрые очереди с несколькими обработчиками это не подходит, для того, чтобы не дёргать метод получения сообщений Amazon реализовал Long Polling, который позволяет висеть, дожидаясь сообщения до двадцати секунд, а так как SQS тарифицируется по количеству вызываемых методов это должно существенно сократить затраты на очереди, но вот какая есть проблема: для небольшого количества сообщений (согласно официальной документации) очередь может не вернуть ничего. Такое поведение критично для очередей, в которых требуется быстро реагировать на событие и вообще говоря, если такое происходит часто то и Long Polling не имеет особого смысла, поскольку становится эквивалентен периодическим опросам со временем реакции SQS.
Для проверки создадим два процесса, один из которых будет в случайные моменты времени отправлять сообщения, а второй — постоянно находиться в Long Polling, при этом моменты отправки и получения сообщений будут сохраняться для последующего сравнения. Для того, чтобы включить этот режим, установим Receive Message Wait Time = 20 seconds в параметрах очереди.
send_sleep(L, Queue) -> timer:sleep(random:uniform(10000)), Call = erlang:now(), erlcloud_sqs:send_message(Queue, random_string(6000 + random:uniform(6000))), [Call | L].
эта функция засыпает на случайное количество миллисекунд, после чего запоминает момент и отправляет сообщение
remember_moment(L, []) -> L; remember_moment(L, [_ | _]) -> [erlang:now() | L]. receive_polling(L, Queue) -> [{messages, List} | _] = erlcloud_sqs:receive_message(Queue), remove_list(Queue, List), remember_moment(L, List).
эти две функции позволяют получать сообщения и запоминать моменты, в которые это произошло. После одновременного исполнения этих функций при помощи spawn я получаю два списка, разница между которыми и показывает время реакции на сообщение. Здесь не учитывается то, что сообщения могут перемешаться, в целом это просто увеличит дополнительно время реакции.
Посмотрим, что получилось:
Интервал засыпания | 10000 | 7500 | 5000 | 2500 |
---|---|---|---|---|
Минимальное время (сек) | 0.27 | 0.28 | 0.27 | 0.66 |
Максимальное время (сек) | 10.25 | 7.8 | 5.36 | 5.53 |
Среднее время (сек) | 1.87 | 1.87 | 1.84 | 1.88 |
первая строка — значение, выставленное в качестве максимальной задержки отправляющего процесса. То есть: 10 секунд, 7.5 секунд… Остальные строки — минимальное, максимальное и среднее время ожидания получения сообщения.
То же самое в виде графика:
Среднее время получилось во всех случаях одинаковое, можно сказать, что в среднем между отправкой таких одиночных сообщений до их получения проходит две секунды. Достаточно долго. В этом тесте выборка была довольно маленькой, 20 сообщений, по этому минимальные-максимальные значения скорее вопрос удачи, нежели какая-то зависимость.
Пакетная отправка
Для начала проверим на сколько важен эффект “разогрева” очереди при отправке сообщений:
Количество записей | 20 | 50 | 100 | 150 | 200 | 250 | 300 | 400 | 500 | 600 | 700 | 800 | 900 | 1000 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Минимальное время (сек) | 0.1 | 0.1 | 0.1 | 0.09 | 0.09 | 0.09 | 0.09 | 0.1 | 0.09 | 0.1 | 0.1 | 0.09 | 0.09 | 0.09 |
Максимальное время (сек) | 0.19 | 0.37 | 0.41 | 0.41 | 0.37 | 0.38 | 0.37 | 0.43 | 0.39 | 0.66 | 0.74 | 0.48 | 0.53 | 0.77 |
Среднее время (сек) | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 | 0.12 |
То же в виде графика:
можно сказать, что никакого разогрева не наблюдается, то есть очередь ведёт себя примерно одинаково на этих объёмах данных, только максимальное почему-то повышается, но среднее и минимальное остаются на своих местах.
То же самое для чтения с удалением
Количество записей | 20 | 50 | 100 | 150 | 200 | 250 | 300 | 400 | 500 | 600 | 700 | 800 | 900 | 1000 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Минимальное время (сек) | 0.001 | 0.14 | 0 | 0.135 | 0 | 0.135 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
Максимальное время (сек) | 0.72 | 0.47 | 0.65 | 0.65 | 0.69 | 0.51 | 0.75 | 0.75 | 0.76 | 0.73 | 0.82 | 0.79 | 0.74 | 0.91 |
Среднее время (сек) | 0.23 | 0.21 | 0.21 | 0.21 | 0.21 | 0.21 | 0.21 | 0.21 | 0.21 | 0.2 | 0.2 | 0.2 | 0.2 | 0.21 |
Здесь также никакого насыщения, среднее в районе 200мс. Иногда чтение происходило мгновенно (быстрей, чем 1 мс), но это означает, что сообщение не было получено, согласно документации, сервера SQS могут так поступать, нужно просто запросить сообщение повторно.
Перейдём непосредственно к блочному и многопоточному тестированию
К сожалению, библиотека erlcloud не содержит функции для пакетной отправки сообщений, но такие функции не сложно реализовать на базе существующих, в функции отправки сообщения нужно поменять запрос на следующий:
Doc = sqs_xml_request(Config, QueueName, "SendMessageBatch", encode_message_list(Messages, 1)),
и дописать функцию формирования запроса:
encode_message_list([], _) -> []; encode_message_list([H | T], N) -> MesssageId = string:concat("SendMessageBatchRequestEntry.", integer_to_list(N)), [{string:concat(MesssageId, ".Id"), integer_to_list(N)}, {string:concat(MesssageId, ".MessageBody"), H} | encode_message_list(T, N + 1)].
в библиотеке следует также исправить версию API к примеру на 2011-10-01, иначе Amazon будет возвращать Bad request в ответ на ваши запросы.
функции тестирования аналогичны используемым в других тестах:
gen_messages(0) -> []; gen_messages(N) -> [random_string(5000 + random:uniform(1000)) | gen_messages(N - 1)]. send_batch(N, Queue) -> erlang:display(erlcloud_sqs:send_message_batch(Queue, gen_messages(10))), N + 1 .
Здесь только пришлось поменять длину сообщений с тем, чтобы весь пакет укладывался в 64кб, иначе генерируется исключение.
Были получены следующие данные для записи:
Количество потоков | 0 | 1 | 2 | 4 | 5 | 10 | 20 | 50 | 100 |
---|---|---|---|---|---|---|---|---|---|
Максимальная задержка (сек) | 0.452 | 0.761 | 0.858 | 1.464 | 1.698 | 3.14 | 5.272 | 11.793 | 20.215 |
Средняя задержка (сек) | 0.118 | 0.48 | 0.436 | 0.652 | 0.784 | 1.524 | 3.178 | 9.1 | 19.889 |
Время на сообщение (сек) | 0.118 | 0.048 | 0.022 | 0.017 | 0.016 | 0.016 | 0.017 | 0.019 | 0.02 |
здесь 0 означает чтение по одному в 1 поток, далее — 1 чтение по 10 в 1 поток, по 10 в 2 потока, по 10 в 4 потока и так далее
Для чтения:
Количество потоков | 0 | 1 | 2 | 4 | 5 | 10 | 20 | 50 | 100 |
---|---|---|---|---|---|---|---|---|---|
Максимальная задержка (сек) | 0.762 | 2.998 | 2.511 | 2.4 | 2.606 | 2.751 | 4.944 | 11.653 | 18.517 |
Средняя задержка (сек) | 0.205 | 1.256 | 1.528 | 1.566 | 1.532 | 1.87 | 3.377 | 7.823 | 17.786 |
Время на сообщение (сек) | 0.205 | 0.126 | 0.077 | 0.04 | 0.031 | 0.02 | 0.019 | 0.017 | 0.019 |
график, отражающий пропускную способность для чтения и записи (сообщений в секунду):
Синий цвет — запись, красный — чтение.
Из этих данных можно сделать вывод, что максимальная пропускная способность достигается для записи в районе 10 потоков, а для чтения — около 50, с дальнейшим увеличением числа потоков количество отдаваемых сообщений в единицу времени не повышается.
Выводы
Получается, что Amazon SQS существенным образом меняет порядок сообщений, имеет не слишком хорошее время реакции и пропускную способность, противопоставить этому можно только надёжность и небольшую (в случае небольшого количества сообщений) плату. То есть если вам не критична скорость, не важно, что сообщения перемешаются и вам не хочется администрировать или нанимать администратора сервера очередей — это ваш выбор.
Ссылки
- Erlcloud on github github.com/gleber/erlcloud
- www.w3.org/TR/REC-xml/#charsets
ссылка на оригинал статьи http://habrahabr.ru/post/207326/
Добавить комментарий