Очередь задач в PostgreSQL

от автора

Очередь слонов - pixabay.com

Для организации обработки потока задач используются очереди. Они нужны для накопления и распределения задач по исполнителям. Также очереди могут обеспечивать дополнительные требования к обработке задач: гарантия доставки, гарантия однократного исполнения, приоритезация и т. д.

Как правило, используются готовые системы очередей сообщений (MQ — message queue), но иногда нужно организовать ad hoc очередь или какую-нибудь специализированную (например, очередь с приоритетом и отложенным перезапуском не обработанных из-за исключений задач). О создании таких очередей и пойдёт речь ниже.

Ограничения применимости

Предлагаемые решения предназначены для обработки потока однотипных задач. Они не подходят для организации pub/sub или обмена сообщениями между слабо связанными системами и компонентами.

Очередь поверх реляционной БД хорошо работает при малых и средних нагрузках (сотни тысяч задач в сутки, десятки-сотни исполнителей), но для больших потоков лучше использовать специализированное решение.

Суть метода в пяти словах

select ... for update skip locked

Базовая очередь

Для простоты здесь и далее в таблице будут храниться только уникальные идентификаторы задач. Добавление какой-нибудь полезной нагрузки не должно составить труда.

Таблица для простейшей очереди содержит саму задачу и её статус:

create table task (     id          bigint not null primary key,     status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена );  create index task__status__idx on task (status);

Добавление задачи:

insert into task (id) values ($1) on conflict (id) do nothing;

Получение следующей задачи:

with next_task as (     select id from task     where status = 0     limit 1     for update skip locked ) update task set     status = 1 from next_task where task.id = next_task.id returning task.id;

Завершение задачи:

update task set     status = 2 where id = $1;

Очередь с приоритетами

В простом случае id задачи является её приоритетом. Меняется только запрос на получение следующей задачи — добавляется условие сортировки order by id с требуемым порядком обработки задач. Также нужно создать составной индекс по (status, id).

Либо для приоритета добавляется отдельный столбец:

create table task (     id          bigint not null primary key,     priority    integer not null,     status      integer not null default 0      -- 0 - новая, 1 - в работе, 2 - выполнена );  create index task__status__priority__idx on task (status, priority);

Добавление задачи:

insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;

Получение следующей задачи:

with next_task as (     select id from task     where status = 0     order by priority     limit 1     for update skip locked ) update task set     status = 1 from next_task where task.id = next_task.id returning task.id;

Выделенный столбец позволяет менять приоритет задачи «на лету».

Очередь с повтором «упавших» задач

В процессе выполнения задачи может произойти ошибка или исключительная ситуация. В таких случаях задачу нужно поставить в очередь повторно. Иногда ещё требуется отложить и время её повторного исполнения на некоторое время, например, если исключение связано с временной недоступностью стороннего сервиса.

create table task (     id          bigint not null primary key,     status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)     attempt     integer not null default 0,     delayed_to  timestamp null,     error_text  text null );  create index task__status__delayed_to__idx on task (status, delayed_to);

Как видно, расширился список статусов и добавились новые столбцы:

  • attempt — номер попытки; нужен для принятия решения о необходимости повтора (ограничение количества попыток) и для выбора задержки перед повтором (например, каждая следующая попытка откладывается на 10 * attempt минут);
  • delayed_to — время следующей попытки выполнения задачи;
  • error_text — текст ошибки.

Текст ошибки нужен для группировки по типам ошибки.

Пример. Система мониторинга сообщает, что в очереди скопились тысячи задач со статусом «ошибка». Выполняем запрос:

select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;

За подробностями идём в логи исполнителей. Исправляем ситуацию, вызвавшую ошибки (если это возможно). При необходимости ускоряем перезапуск задач установкой статуса в 0 или сдвигом времени следующей попытки.

Получение следующей новой задачи:

with next_task as (     select id from task     where status = 0     limit 1     for update skip locked ) update task set     status = 1,     attempt = attempt + 1,     delayed_to = null,     error_text = null from next_task where task.id = next_task.id returning task.id;

Получение следующей отложенной из-за ошибки задачи:

with next_task as (     select id from task     where status = 3       and delayed_to < localtimestamp     limit 1     for update skip locked ) update task set     status = 1,     attempt = attempt + 1,     delayed_to = null,     error_text = null from next_task where task.id = next_task.id returning task.id;

Успешное завершение задачи:

update task set     status = 2,     delayed_to = null,     error_text = null where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:

update task set     status = 3,     delayed_to = localtimestamp + make_interval(mins => 5 * attempt),     error_text = $2 where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:

update task set     status = 4,     delayed_to = null,     error_text = $2 where id = $1;

Запрос получения следующей задачи разделён на два, чтобы СУБД могла построить эффективный план запроса для очереди с приоритетом. Условие отбора с or может очень плохо сочетаться с сортировкой order by.

Сбор метрик

Добавляем такие атрибуты:

  • время создания задачи;
  • время изменения задачи;
  • время начала и завершения выполнения задачи.

create table task (     id          bigint not null primary key,     status      integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)     attempt     integer not null default 0,     begin_time  timestamp null,     end_time    timestamp null,     delayed_to  timestamp null,     error_text  text null,     created     timestamp not null default localtimestamp,     updated     timestamp not null default localtimestamp );  create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated);

Учитываем добавленные столбцы во всех запросах.

Получение следующей новой задачи:

with next_task as (     select id from task     where status = 0     limit 1     for update skip locked ) update task set     status = 1,     attempt = attempt + 1,     begin_time = localtimestamp,     end_time = null,     delayed_to = null,     error_text = null,     updated = localtimestamp from next_task where task.id = next_task.id returning task.id;

Получение следующей отложенной из-за ошибки задачи:

with next_task as (     select id from task     where status = 3       and delayed_to < localtimestamp     limit 1     for update skip locked ) update task set     status = 1,     attempt = attempt + 1,     begin_time = localtimestamp,     end_time = null,     delayed_to = null,     error_text = null,     updated = localtimestamp from next_task where task.id = next_task.id returning task.id;

Успешное завершение задачи:

update task set     status = 2,     end_time = localtimestamp,     delayed_to = null,     error_text = null,     updated = localtimestamp where id = $1;

Задача завершилась с ошибкой, будет повтор через (5 * количество попыток) минут:

update task set     status = 3,     end_time = localtimestamp,     delayed_to = localtimestamp + make_interval(mins => 5 * attempt),     error_text = $2,     updated = localtimestamp where id = $1;

Задача завершилась с фатальной ошибкой, повтора не будет:

update task set     status = 4,     end_time = localtimestamp,     delayed_to = null,     error_text = $2,     updated = localtimestamp where id = $1;

Примеры, для чего это может быть нужно

Поиск и перезапуск повисших задач:

update task set     status = 3,     end_time = localtimestamp,     delayed_to = localtimestamp,     error_text = 'hanged',     updated = localtimestamp where status = 1   and updated < localtimestamp - interval '1 hour';

Удаление старых задач:

delete from task where updated < localtimestamp - interval '30 days';

Статистика по выполнению задач:

select     date_trunc('hour', end_time),     count(*),     sum(end_time - begin_time),     avg(end_time - begin_time) from task where status = 2   and end_time >= '2019-12-16' group by 1 order by 1;

Повторный запуск ранее выполненных задач

Например, обновился документ, нужно его переиндексировать для полнотекстового поиска.

create table task (     id              bigint not null primary key,     task_updated_at timestamp not null default localtimstamp,     status          integer not null default 0,     -- 0 - новая, 1 - в работе, 2 - выполнена, 3 - ошибка, 4 - фатальная ошибка (повтора не будет)     begin_time      timestamp null,     end_time        timestamp null,     delayed_to      timestamp null,     error_text      text null,     created         timestamp not null default localtimestamp,     updated         timestamp not null default localtimestamp );

Здесь для времени обновления задачи добавлен столбец task_updated_at, но можно было бы использовать поле created.

Добавление или обновление (перезапуск) задачи:

insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set     task_updated_at = excluded.task_updated_at,     status = case when status = 1 then 1 else 0 end,     delayed_to = null,     error_text = null,     updated = localtimestamp where task_updated_at < excluded.task_updated_at;

Что здесь происходит. Задача становится «новой», если она сейчас не исполняется.

В запросе завершения задачи также будет проверка, была ли она изменена во время исполнения.

Запросы на получение следующей задачи такие же, как в очереди со сбором метрик.

Успешное завершение задачи:

update task set     status = case when begin_time >= updated then 2 else 0 end,     end_time = localtimestamp,     delayed_to = null,     error_text = null,     updated = localtimestamp where id = $1;

Завершение задачи с ошибкой: в зависимости от задачи. Можно сделать безусловное откладывание перезапуска, можно при обновлении ставить статус «новая».

Pipeline

Задача проходит несколько стадий. Можно для каждой стадии сделать отдельную очередь. А можно в таблицу добавить соответствующий столбец.

Пример на основе базовой очереди, чтобы не загромождать код. Все ранее описанные модификации без проблем применимы и к этой очереди.

create table task (     id      bigint not null primary key,     stage   integer not null default 0,     status  integer not null default 0 );  create index task__stage__status__idx on task (stage, status);

Получение следующей задачи на заданной стадии:

with next_task as (     select id from task     where stage = $1       and status = 0     limit 1     for update skip locked ) update task set     status = 1 from next_task where task.id = next_task.id returning task.id;

Завершение задачи с переходом на указанную стадию:

update task set     stage = $2,     status = 2 where id = $1;

Или переход на следующую по порядку стадию:

update task set     stage = stage + 1,     status = 2 where id = $1;

Задачи по расписанию

Это вариация очереди с повтором.

У каждой задачи может быть своё расписание (в простейшем варианте — периодичность запуска).

create table task (     id              bigint not null primary key,     period          integer not null,               -- периодичность запуска в секундах     status          integer not null default 0,     -- 0 - новая, 1 - в работе     next_run_time   timestamp not null default localtimestamp );  create index task__status__next_run_time__idx on task (status, next_run_time);

Добавление задачи:

insert into task (id, period, next_run_time) values ($1, $2, $3);

Получение следующей задачи:

with next_task as (     select id from task     where status = 0       and next_run_time <= localtimestamp     limit 1     for update skip locked ) update task set     status = 1 from next_task where task.id = next_task.id returning task.id;

Завершение задачи и планирование следующего запуска:

update task set     status = 0,     next_run_time = next_run_time + make_interval(secs => period) where id = $1

Вместо заключения

В создании специализированной очереди задач средствами РСУБД нет ничего сложного.

«Самопальная» очередь будет отвечать даже самым диким практически любым требованиям бизнеса/предметной области.

Ну и не следует забывать, что как и любая другая БД, очередь требует вдумчивого тюнинга сервера, запросов и индексов.


ссылка на оригинал статьи https://habr.com/ru/post/481556/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *