
Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.
Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.
Ограничения применимости
Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.
Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.
Суть метода в пяти словах
select ... for update skip locked
Базовая очередь
Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.
Таблица для простейшей очереди содержит саму задачу и её статус:
create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена ); create index task__status__idx on task (status);
Добавление задачи:
insert into task (id) values ($1) on conflict (id) do nothing;
Получение следующей задачи:
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Завершение задачи:
update task set status = 2 where id = $1;
Очередь с приоритетами
В простом случае id задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id).
Либо для приоритета добавляется отдельный столбец:
create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - новая, 1 - в работе, 2 - выполнена ); create index task__status__priority__idx on task (status, priority);
insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;
with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Выделенный столбец позволяет менять приоритет задачи «на лету».
Очередь с повтором «упавших» задач
В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.
create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to);
Как видно, расширился список статусов и добавились новые столбцы:
attempt— номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на10 * attemptминут);delayed_to— время следующей попытки выполнения задачи;error_text— текст ошибки.
Текст ошибки нужен для группировки по типам ошибки.
Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом «ошибка». Выполняем запрос:
select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;
За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
update task set status = 2, delayed_to = null, error_text = null where id = $1;
update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1;
update task set status = 4, delayed_to = null, error_text = $2 where id = $1;
Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or может очень плохо сочетаться с сортировкой order by.
Сбор метрик
Добавляем такие атрибуты:
- время создания задачи;
- время изменения задачи;
- время начала и завершения выполнения задачи.
create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated);
Учитываем добавленные столбцы во всех запросах.
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1;
update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1;
Примеры, для чего это может быть нужно
Поиск и перезапуск повисших задач:
update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour';
Удаление старых задач:
delete from task where updated < localtimestamp - interval '30 days';
Статистика по выполнению задач:
select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1;
Повторный запуск ранее выполненных задач
Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.
create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp );
Здесь для времени обновления задачи добавлен столбец task_updated_at, но можно было бы использовать поле created.
Добавление или обновление (перезапуск) задачи:
insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at;
Что здесь происходит. Задача становится «новой», если она сейчас не исполняется.
В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.
Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.
Успешное завершение задачи:
update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус «новая».
Pipeline
Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.
Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.
create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status);
Получение следующей задачи на заданной стадии:
with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Завершение задачи с переходом на указанную стадию:
update task set stage = $2, status = 2 where id = $1;
Или переход на следующую по порядку стадию:
update task set stage = stage + 1, status = 2 where id = $1;
Задачи по расписанию
Это вариация очереди с повтором.
У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).
create table task ( id bigint not null primary key, period integer not null, -- периодичность запуска в секундах status integer not null default 0, -- 0 - новая, 1 - в работе next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time);
Добавление задачи:
insert into task (id, period, next_run_time) values ($1, $2, $3);
Получение следующей задачи:
with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Завершение задачи и планирование следующего запуска:
update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1
Вместо заключения
В создании специализированной очереди задач средствами РСУБД нет ничего сложного.
«Самопальная» очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.
Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.
ссылка на оригинал статьи https://habr.com/ru/post/481556/
Добавить комментарий