Типовая задача подобного рода звучит обычно примерно так: «Вот тут бухгалтерия выгрузила из клиент-банка последние поступившие оплаты, надо их быстренько вкачать на сайт и привязать к счетам».
Но когда объем этого «чего-то» начинает измеряться сотнями мегабайт, а сервис при этом должен продолжать работать с базой в режиме 24×7, возникает множество side-эффектов, которые будут портить вам жизнь.

Чтобы справиться с ними в PostgreSQL (да и не только в нем), можно использовать некоторые возможности для оптимизаций, которые позволят обработать все быстрее и с меньшим расходом ресурсов.
1. Куда грузить?
Сначала давайте определимся, куда мы можем залить данные, которые мы хотим «отпроцессить».
1.1. Временные таблицы (TEMPORARY TABLE)
В принципе, для PostgreSQL временные — это такие же таблицы, как и любые другие. Поэтому неверны суеверия типа «там все хранится только в памяти, а она может кончиться». Но есть и несколько существенных отличий.
Свой «неймспейс» для каждого подключения к БД
Если два подключения попытаются одновременно выполнить CREATE TABLE x, то кто-то обязательно получит ошибку неуникальности объектов БД.
А вот если оба попытаются выполнить CREATE TEMPORARY TABLE x, то оба нормально это сделают, и каждый получит свой экземпляр таблицы. И ничего общего между ними не будет.
«Самоуничтожение» при disconnect
При закрытии подключения все временные таблицы автоматически удаляются, поэтому «вручную» выполнять DROP TABLE x смысла нет никакого, кроме…
Если вы работаете через pgbouncer в transaction mode, то база-то продолжает считать, что это соединение все еще активно, и в нем-то эта временная таблица по-прежнему существует.
Поэтому попытка создать ее повторно, уже из другого подключения к pgbouncer, приведет к ошибке. Но это можно обойти, воспользовавшись CREATE TEMPORARY TABLE IF NOT EXISTS x.
Правда, лучше так все-таки не делать, потому что затем можно «внезапно» обнаружить там, оставшиеся от «предыдущего владельца» данные. Вместо этого гораздо лучше прочитать-таки мануал, и увидеть, что при создании таблицы есть возможность дописать ON COMMIT DROP — то есть при завершении транзакции таблица будет автоматически удалена.
Не-репликация
В силу принадлежности только определенному соединению, временные таблицы не реплицируются. Зато это избавляет от необходимости двойной записи данных в heap + WAL, поэтому INSERT/UPDATE/DELETE в нее существенно быстрее.
Но поскольку временная — это все-таки «почти обычная» таблица, то и на реплике ее создать нельзя тоже. По крайней мере, пока, хотя соответствующий патч уже давно ходит.
1.2. Нежурналируемые таблицы (UNLOGGED TABLE)
Но что делать, например, если у вас есть какой-то громоздкий ETL-процесс, который не удается реализовать в рамках одной транзакции, а у вас таки pgbouncer в transaction mode?..
Или поток данных настолько велик, что недостаточно пропускной способности одного соединения с БД (читай, одного процесса на CPU)?..
Или часть операций идут асинхронно в разных коннектах?..
Тут вариант только один — временно создавать не-временную таблицу. Каламбур, ага. То есть:
- создал «свои» таблицы с максимально-случайными именами, чтобы ни с кем не пересечься
- Extract: залил в них данные из внешнего источника
- Transform: преобразовал, заполнил ключевые связывающие поля
- Load: перелил готовые данные в целевые таблицы
- удалил «свои» таблицы
А теперь — ложка дегтя. По сути, вся запись в PostgreSQL происходит дважды — сначала в WAL, потом уже в тела таблицы/индексов. Все это сделано для поддержки ACID и корректной видимости данных между COMMIT‘нутыми и ROLLBACK‘нутыми транзакциями.
Но нам-то этого не нужно! У нас весь процесс или целиком успешно прошел, или нет. Неважно, сколько в нем будет промежуточных транзакций — нам не интересно «продолжать процесс с середины», особенно когда непонятно, где она была.
Для этого разработчики PostgreSQL еще в версии 9.1 внедрили такую штуку как нежурналируемые (UNLOGGED) таблицы:
С этим указанием таблица создаётся как нежурналируемая. Данные, записываемые в нежурналируемые таблицы, не проходят через журнал предзаписи (см. Главу 29), в результате чего такие таблицы работают гораздо быстрее обычных. Однако, они не защищены от сбоя; при сбое или аварийном отключении сервера нежурналируемая таблица автоматически усекается. Кроме того, содержимое нежурналируемой таблицы не реплицируется на ведомые серверы. Любые индексы, создаваемые для нежурналируемой таблицы, автоматически становятся нежурналируемыми.
Короче, будет сильно быстрее, но если сервер БД «упадет» — будет неприятно. Но часто ли это происходит, и умеет ли ваш ETL-процесс это корректно дорабатывать «с середины» после «оживления» БД?..
Если таки нет, и кейс выше похож на ваш — используйте UNLOGGED, но никогда не включайте этот атрибут на реальных таблицах, данные из которых вам дороги.
1.3. ON COMMIT { DELETE ROWS | DROP }
Эта конструкция позволяет при создании таблицы задать автоматическое поведение при завершении транзакции.
Про ON COMMIT DROP я уже написал выше, он генерирует DROP TABLE, а вот с ON COMMIT DELETE ROWS ситуация интереснее — тут генерируется TRUNCATE TABLE.
Поскольку вся инфраструктура хранения метаописания временной таблицы ровно такая же, как и у обычной, то постоянное создание-удаление временных таблиц приводит к сильному «разбуханию» системных таблиц pg_class, pg_attribute, pg_attrdef, pg_depend,…
Теперь представьте, что у вас есть воркер на прямом соединении с БД, который каждую секунду открывает новую транзакцию, создает, наполняет, обрабатывает и удаляет временную таблицу… Мусора в системных таблицах накопится в избытке, а это лишние тормоза при каждой операции.
В общем, не надо так! В этом случае гораздо эффективнее CREATE TEMPORARY TABLE x ... ON COMMIT DELETE ROWS вынести за цикл транзакций — тогда к началу каждой новой транзакции таблицы уже будет существовать (экономим вызов CREATE), но будет пустой, благодаря TRUNCATE (его вызов мы тоже сэкономили) при завершении предыдущей транзакции.
1.4. LIKE… INCLUDING …
Я упомянул в начале, что один из типичных use case для временных таблиц — это разного рода импорты — и разработчик устало копипастит список полей целевой таблицы в объявление своей временной…
Но лень — двигатель прогресса! Поэтому создать новую таблицу «по образцу» можно гораздо проще:
CREATE TEMPORARY TABLE import_table( LIKE target_table );
Поскольку нагенерить потом в эту таблицу можно весьма много данных, то поиски по ней станут ни разу не быстрыми. Но против этого есть традиционное решение — индексы! И, да, у временной таблицы тоже могут быть индексы.
Поскольку, зачастую, нужные индексы совпадают с индексами целевой таблицы, то можно просто написать LIKE target_table INCLUDING INDEXES.
Если вам нужны еще и DEFAULT-значения (например, для заполнения значений первичного ключа), можно воспользоваться LIKE target_table INCLUDING DEFAULTS. Ну или просто — LIKE target_table INCLUDING ALL — скопирует дефолты, индексы, констрейнты,…
Но тут уже надо понимать, что если вы создавали импорт-таблицу сразу с индексами, то заливаться данные будут дольше, чем если сначала все залить, а уже потом накатить индексы — посмотрите в качестве примера, как это делает pg_dump.
В общем, RTFM!
2. Как писать?
Скажу просто — используйте COPY-поток вместо «пачки» INSERT, ускорение в разы. Можно даже прямо из предварительно сформированного файла.
3. Как обрабатывать?
Итак, пусть наша вводная выглядит примерно так:
- у вас в базе хранится табличка с клиентскими данными на 1M записей
- каждый день клиент присылает вам новый полный «образ»
- по опыту вы знаете, что от раза к разу изменяется не более 10K записей
Классическим примером подобной ситуации является база КЛАДР — всего адресов много, но в каждой недельной выгрузке изменений (переименований населенных пунктов, объединений улиц, появлений новых домов) совсем немного даже в масштабе всей страны.
3.1. Алгоритм полной синхронизации
Для простоты допустим, что вам даже реструктурировать данные не нужно — просто привести таблицу в нужный вид, то есть:
- удалить все, чего уже нет
- обновить все, что уже было, и надо обновлять
- вставить все, чего еще не было
Почему именно в таком порядке стоит делать операции? Потому что именно так размер таблицы вырастет минимально (помни про MVCC!).
DELETE FROM dst
Нет, конечно можно обойтись всего двумя операциями:
- удалить (
DELETE) вообще все - вставить все из нового образа
Но при этом, благодаря MVCC, размер таблицы увеличится ровно в два раза! Получить +1M образов записей в таблице из-за обновления 10K — так себе избыточность…
TRUNCATE dst
Более опытный разработчик знает, что всю табличку целиком можно достаточно дешево зачистить:
- очистить (
TRUNCATE) таблицу целиком - вставить все из нового образа
Метод действенный, иногда вполне применим, но есть незадача… Вливать 1M записей мы будем до-о-олго, поэтому оставить таблицу пустой на все это время (как произойдет без оборачивания в единую транзакцию) не можем себе позволить.
А значит:
- у нас начинается длительная транзакция
TRUNCATEнакладывает AccessExclusive-блокировку- мы долго делаем вставку, а все остальные в это время не могут даже
SELECT
Че-то нехорошо получается…
ALTER TABLE… RENAME… / DROP TABLE …
Как вариант — залить все в отдельную новую таблицу, а потом просто переименовать на место старой. Пара противных мелочей:
- таки тоже AccessExclusive, хоть и существенно меньше по времени
- сбрасываются все планы запросов/статистика этой таблицы, надо гонять ANALYZE
- ломаются все внешние ключи (FK) на таблицу
Был WIP-патч от Simon Riggs, который предлагал сделать ALTER-операцию для подмены тела таблицы на файловом уровне, не трогая статистику и FK, но не собрал кворума.
DELETE, UPDATE, INSERT
Итак, останавливаемся на неблокирующем варианте из трех операций. Почти трех… Как это сделать наиболее эффективно?
-- все делаем в рамках транзакции, чтобы никто не видел "промежуточных" состояний BEGIN; -- создаем временную таблицу с импортируемыми данными CREATE TEMPORARY TABLE tmp( LIKE dst INCLUDING INDEXES -- по образу и подобию, вместе с индексами ) ON COMMIT DROP; -- за рамками транзакции она нам не нужна -- быстро-быстро вливаем новый образ через COPY COPY tmp FROM STDIN; -- ... -- \. -- удаляем отсутствующие DELETE FROM dst D USING dst X LEFT JOIN tmp Y USING(pk1, pk2) -- поля первичного ключа WHERE (D.pk1, D.pk2) = (X.pk1, X.pk2) AND Y IS NOT DISTINCT FROM NULL; -- "антиджойн" -- обновляем оставшиеся UPDATE dst D SET (f1, f2, f3) = (T.f1, T.f2, T.f3) FROM tmp T WHERE (D.pk1, D.pk2) = (T.pk1, T.pk2) AND (D.f1, D.f2, D.f3) IS DISTINCT FROM (T.f1, T.f2, T.f3); -- незачем обновлять совпадающие -- вставляем отсутствующие INSERT INTO dst SELECT T.* FROM tmp T LEFT JOIN dst D USING(pk1, pk2) WHERE D IS NOT DISTINCT FROM NULL; COMMIT;
3.2. Постобработка импорта
В том же самом КЛАДРе все изменившиеся записи необходимо дополнительно прогнать через постобработку — нормализовать, выделить ключевые слова, привести к нужным структурам. Но как узнать — что именно изменялось, не усложняя при этом код синхронизации, в идеале, вообще не трогая его?
Если доступ на запись в момент синхронизации есть только у вашего процесса, то можно воспользоваться триггером, который соберет для нас все изменения:
-- целевые таблицы CREATE TABLE kladr(...); CREATE TABLE kladr_house(...); -- таблицы с историей изменений CREATE TABLE kladr$log( ro kladr, -- тут лежат целые образы записей старой/новой rn kladr ); CREATE TABLE kladr_house$log( ro kladr_house, rn kladr_house ); -- общая функция логирования изменений CREATE OR REPLACE FUNCTION diff$log() RETURNS trigger AS $$ DECLARE dst varchar = TG_TABLE_NAME || '$log'; stmt text = ''; BEGIN -- проверяем необходимость логгирования при обновлении записи IF TG_OP = 'UPDATE' THEN IF NEW IS NOT DISTINCT FROM OLD THEN RETURN NEW; END IF; END IF; -- создаем запись лога stmt = 'INSERT INTO ' || dst::text || '(ro,rn)VALUES('; CASE TG_OP WHEN 'INSERT' THEN EXECUTE stmt || 'NULL,$1)' USING NEW; WHEN 'UPDATE' THEN EXECUTE stmt || '$1,$2)' USING OLD, NEW; WHEN 'DELETE' THEN EXECUTE stmt || '$1,NULL)' USING OLD; END CASE; RETURN NEW; END; $$ LANGUAGE plpgsql;
Теперь мы можем перед началом синхронизации триггеры наложить (или включить через ALTER TABLE ... ENABLE TRIGGER ...):
CREATE TRIGGER log AFTER INSERT OR UPDATE OR DELETE ON kladr FOR EACH ROW EXECUTE PROCEDURE diff$log(); CREATE TRIGGER log AFTER INSERT OR UPDATE OR DELETE ON kladr_house FOR EACH ROW EXECUTE PROCEDURE diff$log();
А потом спокойно из log-таблиц извлекаем все нужные нам изменения и прогоняем по дополнительным обработчикам.
3.3. Импорт связанных наборов
Выше мы рассматривали случаи, когда структуры данных источника и приемника совпадают. Но что делать, если выгрузка из внешней системы имеет формат отличный от структуры хранения у нас в базе?
Возьмем в качестве примера хранение клиентов и счетов по ним, классический вариант «многие-к-одному»:
CREATE TABLE client( client_id serial PRIMARY KEY , inn varchar UNIQUE , name varchar ); CREATE TABLE invoice( invoice_id serial PRIMARY KEY , client_id integer REFERENCES client(client_id) , number varchar , dt date , sum numeric(32,2) );
А вот выгрузка из внешнего источника приходит нам в виде «все в одном»:
CREATE TEMPORARY TABLE invoice_import( client_inn varchar , client_name varchar , invoice_number varchar , invoice_dt date , invoice_sum numeric(32,2) );
Очевидно, что данные по клиентам могут дублироваться в таком варианте, а основной записью является «счет»:
0123456789;Вася;A-01;2020-03-16;1000.00 9876543210;Петя;A-02;2020-03-16;666.00 0123456789;Вася;B-03;2020-03-16;9999.00
Для модели просто вставим наши тестовые данные, но помним — COPY эффективнее!
INSERT INTO invoice_import VALUES ('0123456789', 'Вася', 'A-01', '2020-03-16', 1000.00) , ('9876543210', 'Петя', 'A-02', '2020-03-16', 666.00) , ('0123456789', 'Вася', 'B-03', '2020-03-16', 9999.00);
Сначала выделим те «разрезы», на которые наши «факты» ссылаются. В нашем случае счета ссылаются на клиентов:
CREATE TEMPORARY TABLE client_import AS SELECT DISTINCT ON(client_inn) -- можно просто SELECT DISTINCT, если данные заведомо непротиворечивы client_inn inn , client_name "name" FROM invoice_import;
Чтобы счета правильно связать с ID клиентов, нам эти идентификаторы надо сначала узнать или сгенерировать. Добавим под них поля:
ALTER TABLE invoice_import ADD COLUMN client_id integer; ALTER TABLE client_import ADD COLUMN client_id integer;
Воспользуемся описанным выше способом синхронизации таблиц с небольшой поправкой — не будем ничего обновлять и удалять в целевой таблице, ведь импорт клиентов у нас «append-only»:
-- проставляем в таблице импорта ID уже существующих записей UPDATE client_import T SET client_id = D.client_id FROM client D WHERE T.inn = D.inn; -- unique key -- вставляем отсутствовавшие записи и проставляем их ID WITH ins AS ( INSERT INTO client( inn , name ) SELECT inn , name FROM client_import WHERE client_id IS NULL -- если ID не проставился RETURNING * ) UPDATE client_import T SET client_id = D.client_id FROM ins D WHERE T.inn = D.inn; -- unique key -- проставляем ID клиентов у записей счетов UPDATE invoice_import T SET client_id = D.client_id FROM client_import D WHERE T.client_inn = D.inn; -- прикладной ключ
Собственно, все — в invoice_import теперь у нас заполнено поле связи client_id, с которым мы и вставим счет.
ссылка на оригинал статьи https://habr.com/ru/company/tensor/blog/492464/
Добавить комментарий