Секционирование и «живые снимки» данных в PostgreSQL

от автора

Хотя тема секционирования уже поднималась ранее, я хочу к ней вернуться, чтобы рассказать о своем опыте решения этой задачи, возникшей в связи с необходимостью аналитической обработкой больших объемов данных. Помимо секционирования, я рассмотрю предельно упрощенную реализацию «снимков» агрегированных запросов, автоматически обновляемых при изменении исходных данных..

Одним из главных требований, к разрабатываемой системе, было использование бесплатного ПО, в связи с чем, выбор пал на PostgreSQL. На момент начала работы над проектом, я довольно поверхностно знал PostgreSQL, но был неплохо знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, мне хотелось иметь аналоги таких опций Oracle как Partitioning и Materialized Views. После ознакомления с возможностями PostgreSQL, стало понятно, что этот функционал, так или иначе, придется писать вручную.

Разумеется, речь не шла о какой либо полноценной реализации Materialized Views, предусматривающей переписывание запросов. Для моих нужд вполне хватало возможности создания автоматически обновляемых агрегированных одно-табличных выборок (поддержка соединения таблиц, скорее всего, будет добавлена в ближайшем будущем). Для секционирования, я планировал использовать многократно описанный подход с использованием наследуемых таблиц, со вставкой данных, управляемой триггером. У меня была мысль использовать для управления секционированием Rules, но я от нее отказался, поскольку, в моем случае, преобладала вставка данных одиночными записями.

Начал я, разумеется, с таблиц для хранения метаданных:

ps_tables.sql

create sequence ps_table_seq;  create table    ps_table (   id            bigint         default nextval('ps_table_seq') not null,   name          varchar(50)    not null unique,   primary key(id) );  create sequence ps_column_seq;  create table    ps_column (   id            bigint         default nextval('ps_column_seq') not null,   table_id      bigint         not null references ps_table(id),   name          varchar(50)    not null,   parent_name   varchar(50),   type_name     varchar(8)     not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),   unique (table_id, name),   primary key(id) );  create table    ps_range_partition (   table_id      bigint         not null references ps_table(id),   type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),   start_value   date           not null,   end_value     date           not null,   primary key(table_id, start_value) );  create table    ps_snapshot (   snapshot_id   bigint         not null references ps_table(id),   table_id      bigint         not null references ps_table(id),   type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),   primary key(snapshot_id) ); 

Здесь все достаточно очевидно. Единственное, о чем стоит сказать, это типы столбцов:

Тип Описание
date Столбец, содержащий календарную дату, используемый при секционировании и агрегации данных (поддерживаются типы date и timestamp PostgreSQL)
key Ключ, используемый в фразе group by, при агрегации данных (поддерживаются все целочисленные типы PostgreSQL)
nullable Ключ, используемый при агрегации данных, возможно содержащий значение null
sum Суммирование значений
min Минимальное значение
max Максимальное значение
cnt Подсчет количества не null-значений

Основой всего решения стала функция, выполняющая перестроение функций триггеров для таблицы, содержащей исходные данные:

ps_trigger_regenerate(bigint)

create or replace function ps_trigger_regenerate(in p_table bigint) returns void as $$ declare   l_sql         text;   l_table_name  varchar(50);   l_date_column varchar(50);   l_flag        boolean;   tabs          record;   columns       record; begin   select name into l_table_name   from   ps_table where id = p_table;    l_sql :=   'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||  'as $'|| '$ ' ||  'begin ';   for tabs in     select a.snapshot_id as id,            b.name as table_name,            a.type_name as snapshot_type     from   ps_snapshot a, ps_table b     where  a.table_id = p_table     and    b.id = a.snapshot_id     loop       l_flag = FALSE;       l_sql := l_sql ||      'update ' || tabs.table_name || ' set ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    not type_name in ('date', 'key', 'nullable')         loop           if l_flag then              l_sql := l_sql || ', ';           end if;           l_flag := TRUE;           if columns.type_name = 'sum' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';           end if;           if columns.type_name = 'min' then              l_sql := l_sql ||              columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';           end if;           if columns.type_name = 'max' then              l_sql := l_sql ||              columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';           end if;           if columns.type_name = 'cnt' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';           end if;         end loop;       l_flag = FALSE;       l_sql := l_sql || 'where ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    type_name in ('date', 'key', 'nullable')         loop           if l_flag then              l_sql := l_sql || 'and ';           end if;           l_flag := TRUE;           if columns.type_name = 'date' then              l_sql := l_sql ||              columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';           end if;           if columns.type_name = 'key' then              l_sql := l_sql ||              columns.name || ' = NEW.' || columns.parent_name || ' ';           end if;           if columns.type_name = 'nullable' then              l_sql := l_sql ||              columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';           end if;         end loop;       l_sql := l_sql || '; ' ||      'if not FOUND then ' ||      'insert into ' || tabs.table_name || '(';       l_flag = FALSE;       for columns in         select name, type_name         from   ps_column         where  table_id = tabs.id         loop           if l_flag then              l_sql := l_sql || ', ';           end if;           l_flag := TRUE;           l_sql := l_sql || columns.name;         end loop;       l_sql := l_sql || ') values (';       l_flag = FALSE;       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         loop           if l_flag then              l_sql := l_sql || ', ';           end if;           l_flag := TRUE;           if columns.type_name = 'date' then              l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';           elsif columns.type_name = 'cnt' then              l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';           elsif columns.type_name in ('nullable', 'sum') then              l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';           else              l_sql := l_sql || 'NEW.' || columns.parent_name;           end if;         end loop;       l_sql := l_sql || '); ' ||      'end if; ';     end loop;     select name into l_date_column     from   ps_column     where  table_id = p_table     and    type_name = 'date';     for tabs in       select to_char(start_value, 'YYYYMMDD') as start_value,              to_char(end_value, 'YYYYMMDD') as end_value,              type_name       from   ps_range_partition       where  table_id = p_table       order  by start_value desc       loop         l_sql := l_sql ||        'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||           'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||           'return null; ' ||        'end if; ';       end loop;   l_sql := l_sql ||  'return NEW; '||  'end; '||  '$'||'$ language plpgsql';   execute l_sql;    l_sql :=   'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||  'as $'|| '$ ' ||  'begin ' ||    'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||  'end; '||  '$'||'$ language plpgsql';   execute l_sql;    l_sql :=   'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||  'as $'|| '$ ' ||  'begin ';   for tabs in     select a.snapshot_id as id,            b.name as table_name,            a.type_name as snapshot_type     from   ps_snapshot a, ps_table b     where  a.table_id = p_table     and    b.id = a.snapshot_id     loop       l_flag = FALSE;       l_sql := l_sql ||      'update ' || tabs.table_name || ' set ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    type_name in ('sum', 'cnt')         loop           if l_flag then              l_sql := l_sql || ', ';           end if;           l_flag := TRUE;           if columns.type_name = 'sum' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';           end if;           if columns.type_name = 'cnt' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';           end if;         end loop;       l_flag = FALSE;       l_sql := l_sql || 'where ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    type_name in ('date', 'key', 'nullable')         loop           if l_flag  then              l_sql := l_sql || 'and ';           end if;           l_flag := TRUE;           if columns.type_name = 'date' then              l_sql := l_sql ||              columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';           end if;           if columns.type_name = 'key' then              l_sql := l_sql ||              columns.name || ' = NEW.' || columns.parent_name || ' ';           end if;           if columns.type_name = 'nullable' then              l_sql := l_sql ||              columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';           end if;         end loop;       l_sql := l_sql || '; ';     end loop;   l_sql := l_sql ||  'return null; '||  'end; '||  '$'||'$ language plpgsql';   execute l_sql;    l_sql :=   'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||  'as $'|| '$ ' ||  'begin ';   for tabs in     select a.snapshot_id as id,            b.name as table_name,            a.type_name as snapshot_type     from   ps_snapshot a, ps_table b     where  a.table_id = p_table     and    b.id = a.snapshot_id     loop       l_flag = FALSE;       l_sql := l_sql ||      'update ' || tabs.table_name || ' set ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    type_name in ('sum', 'cnt')         loop           if l_flag then              l_sql := l_sql || ', ';           end if;           l_flag := TRUE;           if columns.type_name = 'sum' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';           end if;           if columns.type_name = 'cnt' then              l_sql := l_sql ||              columns.name || ' = ' || columns.name ||              ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||              ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';           end if;         end loop;       l_flag = FALSE;       l_sql := l_sql || 'where ';       for columns in         select name, parent_name, type_name         from   ps_column         where  table_id = tabs.id         and    type_name in ('date', 'key', 'nullable')         loop           if l_flag then              l_sql := l_sql || 'and ';           end if;           l_flag := TRUE;           if columns.type_name = 'date' then              l_sql := l_sql ||              columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';           end if;           if columns.type_name = 'key' then              l_sql := l_sql ||              columns.name || ' = NEW.' || columns.parent_name || ' ';           end if;           if columns.type_name = 'nullable' then              l_sql := l_sql ||              columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';           end if;         end loop;       l_sql := l_sql || '; ';     end loop;   l_sql := l_sql ||  'return null; '||  'end; '||  '$'||'$ language plpgsql';   execute l_sql; end; $$ language plpgsql; 

Несмотря на свой устрашающий вид, эта функция довольно проста. Ее задача — сформировать (на основе имеющихся метаданных), четыре функции, используемых при построении триггеров:

  • ps_TABLE_insert_trigger() — Функция управляющая вставкой данных
  • ps_TABLE_update_trigger() — Функция управляющая обновлением данных
  • ps_TABLE_delete_trigger() — Функция управляющая удалением данных
  • ps_TABLE_raise_trigger() — Функция запрещающая обновление и удаление данных

Здесь, вместо TABLE подставляется имя таблицы, содержащей исходные данные. Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом:

create or replace function ps_data_insert_trigger() returns trigger as $$ begin   update data_month set     sum_field = sum_field + NEW.sum_field   , min_field = least(min_field, NEW.min_field)   where date_field = date_trunc('month', NEW.date_field)   and   key_field = NEW.key_field;   if not FOUND then      insert into data_month(date_field, key_field, sum_field, min_field)      values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);   end if;   if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and       NEW.date_field < to_date('20130201', 'YYYYMMDD') then      insert into data_20130101 values (NEW.*);      return null;   end if;   return NEW; end; $$ language plpgsql; 

На самом деле, функция выглядит несколько сложнее, поскольку особым образом обрабатываются null-значения. Но, в качестве иллюстрации, приведенный выше пример вполне адекватен. Логика этого кода очевидна:

  • При вставке в исходную таблицу data, пытаемся обновить счетчики в агрегированном представлении data_month
  • Если это не удалось (запись в data_month не найдена), добавляем новую запись
  • Далее, проверяем попадание в интервал дат для каждой секции (в примере одна секция), и при успехе, вставляем запись в соответствующую секцию (поскольку секция наследуется от главной таблицы, можно смело использовать звездочку) и возращаем null, чтобы предотвратить вставку записи в главную таблицу
  • Если ни одна из секций не подходит, возвращаем NEW, позволяя выполнить вставку в главную таблицу

Последний пункт приводит к тому, что если подходящая секция не найдена, данные добавляются в главную таблицу. На практике это довольно удобно. Даже если мы не создадим секцию заранее или получим данные с некорректной датой, вставка данных пройдет успешно. Впоследствии можно проанализировать содержимое главной таблицы, выполнив запрос:

select * from only data 

После чего, создать недостающие секции (как будет показано ниже, данные будут автоматически перенесены из главной таблицы в созданную секцию). В подобных случаях, количество записей, не попавших в свою секцию, как правило, не велико и издержки, на перенос данных, незначительны.

Теперь осталось сделать обвязку. Начнем с функции создания новой секции:

ps_add_range_partition(varchar, varchar, varchar, date)

create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar,                  in p_type varchar, in p_start date) returns void as $$ declare   l_sql       text;   l_end       date;   l_start_str varchar(10);   l_end_str   varchar(10);   l_table     bigint;   l_flag      boolean;   columns     record; begin   perform 1   from   ps_table a, ps_column b   where  a.id = b.table_id and lower(a.name) = lower(p_table)   and    b.type_name = 'date' and lower(b.name) <> lower(p_column);   if FOUND then      raise EXCEPTION 'Conflict DATE columns';   end if;    l_end := p_start + ('1 ' || p_type)::INTERVAL;    perform 1   from   ps_table a, ps_range_partition b   where  a.id = b.table_id and lower(a.name) = lower(p_table)   and (( p_start >= b.start_value and p_start < b.end_value ) or        ( b.start_value >= p_start and b.start_value < l_end ));   if FOUND then      raise EXCEPTION 'Range intervals intersects';   end if;    perform 1   from   ps_table   where  lower(name) = lower(p_table);   if not FOUND then      insert into ps_table(name) values (lower(p_table));   end if;    select id into l_table   from   ps_table   where  lower(name) = lower(p_table);    perform 1   from   ps_column   where  table_id = l_table and type_name = 'date'   and    lower(name) = lower(p_column);   if not FOUND then      insert into ps_column(table_id, name, type_name)      values (l_table, lower(p_column), 'date');   end if;    insert into ps_range_partition(table_id, type_name, start_value, end_value)   values (l_table, p_type, p_start, l_end);    l_start_str = to_char(p_start, 'YYYYMMDD');   l_end_str = to_char(l_end, 'YYYYMMDD');    l_sql :=  'create table ' || p_table || '_' || l_start_str || '(' ||    'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||                 p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||    'primary key (';      l_flag := FALSE;     for columns in       select f.name as name       from ( select ps_array_to_set(a.conkey) as nn              from   pg_constraint a, pg_class b              where  b.oid = a.conrelid              and    a.contype = 'p'              and    b.relname = p_table ) c,             ( select d.attname as name, d.attnum as nn              from   pg_attribute d, pg_class e              where  e.oid = d.attrelid              and    e.relname = p_table ) f       where  f.nn = c.nn       order  by f.nn       loop         if l_flag then            l_sql := l_sql || ', ';         end if;         l_flag := TRUE;         l_sql := l_sql || columns.name;       end loop;    l_sql := l_sql ||  ')) inherits (' || p_table || ')';   execute l_sql;    l_sql :=   'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';   execute l_sql;    perform ps_trigger_regenerate(l_table);    execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;    l_sql :=   'insert into ' || p_table || '_' || l_start_str || ' ' ||  'select * from ' || p_table || ' where ' ||   p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||   p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';   execute l_sql;    l_sql :=   'delete from only ' || p_table || ' where ' ||   p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||   p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';   execute l_sql;    l_sql :=   'create trigger ps_' || p_table || '_before_insert ' ||  'before insert on ' || p_table || ' for each row ' ||  'execute procedure ps_' || p_table || '_insert_trigger()';   execute l_sql;   perform 1   from   ps_snapshot a, ps_column b   where  b.table_id = a.snapshot_id and a.table_id = l_table   and    b.type_name in ('min', 'max');   if FOUND then      l_sql :=      'create trigger ps_' || p_table || '_after_update ' ||     'after update on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_after_delete ' ||     'after delete on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||     'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||     'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;   else      l_sql :=      'create trigger ps_' || p_table || '_after_update ' ||     'after update on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_update_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_after_delete ' ||     'after delete on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_delete_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||     'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||     'execute procedure ps_' || p_table || '_update_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||     'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||     'execute procedure ps_' || p_table || '_delete_trigger()';      execute l_sql;   end if; end; $$ language plpgsql; 

Здесь, после проверки корректности входных данных, мы добавляем необходимые метаданные, после чего, создаем унаследованную таблицу. Затем, мы пересоздаем функции триггеров вызовом ps_trigger_regenerate, после чего переносим данные, подпадающие под условие секционирования в созданную секцию динамическим запросом и пересоздаем сами триггеры.

Сложности возникли с двумя моментами.

  1. Пришлось немного помучиться с прибавлением к стартовой дате месяца, дня или года (в зависимости от входного параметра p_type:
    l_end := p_start + ('1 ' || p_type)::INTERVAL; 

  2. Поскольку первичный ключ не наследуется, пришлось сочинять запрос к System Catalogs, для получения списка колонок первичного ключа исходной таблицы (хранить в своих метаданных еще и описание первичного ключа я счел нецелесообразным):
          select f.name as name       from ( select ps_array_to_set(a.conkey) as nn              from   pg_constraint a, pg_class b              where  b.oid = a.conrelid              and    a.contype = 'p'              and    b.relname = p_table ) c,             ( select d.attname as name, d.attnum as nn              from   pg_attribute d, pg_class e              where  e.oid = d.attrelid              and    e.relname = p_table ) f       where  f.nn = c.nn       order  by f.nn 

Также, следует отметить, что перед созданием индекса, на ключ секционирования (для созданной секции), стоило бы предварительно проверить, не является ли он лидирующим столбцом первичного ключа (чтобы не создавать дублирующий индекс).

Функция удаления секции существенно проще и в особых комментариях не нуждается:

ps_del_range_partition(varchar, date)

create or replace function ps_del_range_partition(in p_table varchar, in p_start date)        returns void as $$ declare   l_sql       text;   l_start_str varchar(10);   l_table     bigint; begin   select id into l_table   from   ps_table   where  lower(name) = lower(p_table);    l_start_str = to_char(p_start, 'YYYYMMDD');    delete from ps_range_partition    where  table_id = l_table   and    start_value = p_start;    perform ps_trigger_regenerate(l_table);    l_sql :=   'insert into ' || p_table || ' ' ||  'select * from ' || p_table || '_' || l_start_str;   execute l_sql;    perform 1   from ( select 1          from   ps_range_partition          where  table_id = l_table          union  all          select 1          from   ps_snapshot          where  table_id = l_table ) a;   if not FOUND then      execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;      execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;      execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;       execute 'drop function ps_' || p_table || '_insert_trigger() cascade';      execute 'drop function ps_' || p_table || '_raise_trigger()  cascade';      execute 'drop function ps_' || p_table || '_update_trigger() cascade';      execute 'drop function ps_' || p_table || '_delete_trigger() cascade';       delete from ps_column where table_id = l_table;      delete from ps_table where id = l_table;   end if;    perform 1   from   ps_range_partition   where  table_id = l_table;   if not FOUND then      delete from ps_column       where  table_id = l_table      and    type_name = 'date';   end if;    execute 'drop table ' || p_table || '_' || l_start_str; end; $$ language plpgsql; 

При удалении секции, данные, естественно, не теряются, а переносятся в главную таблицу (предварительно удаляются триггеры, поскольку, как выяснилось, ключевое слово only не работает в операторе insert).

Осталось добавить функции управления «живыми» снимками данных:

ps_add_snapshot_column(varchar, varchar, varchar, varchar)

create or replace function ps_add_snapshot_column(in p_snapshot varchar,       in p_column varchar, in p_parent varchar, in p_type varchar) returns void as $$ declare   l_table bigint; begin   perform 1   from   ps_table   where  lower(name) = lower(p_snapshot);   if not FOUND then      insert into ps_table(name) values (lower(p_snapshot));   end if;    select id into l_table   from   ps_table   where  lower(name) = lower(p_snapshot);    insert into ps_column(table_id, name, parent_name, type_name)   values (l_table, lower(p_column), lower(p_parent), p_type); end; $$ language plpgsql; 

ps_add_snapshot(varchar, varchar, varchar)

create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar,       in p_type varchar) returns void as $$ declare   l_sql      text;   l_table    bigint;   l_snapshot bigint;   l_flag     boolean;   columns    record; begin   select id into l_snapshot   from   ps_table   where  lower(name) = lower(p_snapshot);    perform 1   from   ps_column   where  table_id = l_snapshot   and    type_name in ('date', 'key');   if not FOUND then      raise EXCEPTION 'Key columns not found';   end if;    perform 1   from   ps_column   where  table_id = l_snapshot   and    not type_name in ('date', 'key', 'nullable');   if not FOUND then      raise EXCEPTION 'Aggregate columns not found';   end if;    perform 1   from   ps_table   where  lower(name) = lower(p_table);   if not FOUND then      insert into ps_table(name) values (lower(p_table));   end if;    select id into l_table   from   ps_table   where  lower(name) = lower(p_table);    insert into ps_snapshot(table_id, snapshot_id, type_name)   values (l_table, l_snapshot, p_type);    perform ps_trigger_regenerate(l_table);    l_sql := 'create table ' || p_snapshot || ' (';   l_flag := FALSE;   for columns in     select name, type_name     from   ps_column     where  table_id = l_snapshot     loop       if l_flag then          l_sql := l_sql || ', ';       end if;       l_flag := TRUE;       if columns.type_name = 'date' then          l_sql := l_sql || columns.name || ' date not null';       else          l_sql := l_sql || columns.name || ' bigint not null';       end if;     end loop;   l_sql := l_sql || ', primary key (';   l_flag := FALSE;   for columns in     select name     from   ps_column     where  table_id = l_snapshot     and    type_name in ('date', 'key', 'nullable')     loop       if l_flag then          l_sql := l_sql || ', ';       end if;       l_flag := TRUE;       l_sql := l_sql || columns.name;     end loop;   l_sql := l_sql || '))';   execute l_sql;    execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;    l_sql :=   'create trigger ps_' || p_table || '_before_insert ' ||  'before insert on ' || p_table || ' for each row ' ||  'execute procedure ps_' || p_table || '_insert_trigger()';   execute l_sql;    perform 1   from   ps_snapshot a, ps_column b   where  b.table_id = a.snapshot_id and a.table_id = l_table   and    b.type_name in ('min', 'max');   if FOUND then      l_sql :=      'create trigger ps_' || p_table || '_after_update ' ||     'after update on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_after_delete ' ||     'after delete on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_raise_trigger()';      execute l_sql;   else      l_sql :=      'create trigger ps_' || p_table || '_after_update ' ||     'after update on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_update_trigger()';      execute l_sql;      l_sql :=      'create trigger ps_' || p_table || '_after_delete ' ||     'after delete on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_delete_trigger()';      execute l_sql;   end if;    l_sql := 'insert into ' || p_snapshot || '(';   l_flag := FALSE;   for columns in     select name     from   ps_column     where  table_id = l_snapshot     loop       if l_flag then          l_sql := l_sql || ', ';       end if;       l_flag := TRUE;       l_sql := l_sql || columns.name;     end loop;   l_sql := l_sql || ') select ';   l_flag := FALSE;   for columns in     select parent_name as name, type_name     from   ps_column     where  table_id = l_snapshot     loop       if l_flag then          l_sql := l_sql || ', ';       end if;       l_flag := TRUE;       if columns.type_name = 'date' then          l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';       end if;       if columns.type_name = 'key' then          l_sql := l_sql || columns.name;       end if;       if columns.type_name = 'nullable' then          l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';       end if;       if columns.type_name = 'sum' then          l_sql := l_sql || 'sum(' || columns.name || ')';       end if;       if columns.type_name = 'min' then          l_sql := l_sql || 'min(' || columns.name || ')';       end if;       if columns.type_name = 'max' then          l_sql := l_sql || 'max(' || columns.name || ')';       end if;       if columns.type_name = 'cnt' then          l_sql := l_sql || 'count(' || columns.name || ')';       end if;     end loop;   l_sql := l_sql || 'from ' || p_table || ' group by ';   l_flag := FALSE;   for columns in     select parent_name as name, type_name     from   ps_column     where  table_id = l_snapshot     and    type_name in ('date', 'key', 'nullable')     loop       if l_flag then          l_sql := l_sql || ', ';       end if;       l_flag := TRUE;       if columns.type_name = 'date' then          l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';       else          l_sql := l_sql || columns.name;       end if;     end loop;   execute l_sql; end; $$ language plpgsql; 

ps_del_snapshot(varchar)

create or replace function ps_del_snapshot(in p_snapshot varchar) returns void as $$ declare   l_sql      text;   p_table    varchar(50);   l_table    bigint;   l_snapshot bigint; begin   select a.table_id, c.name into l_table, p_table   from   ps_snapshot a, ps_table b, ps_table c   where  b.id = a.snapshot_id and c.id = a.table_id   and    lower(b.name) = lower(p_snapshot);    select id into l_snapshot   from   ps_table   where  lower(name) = lower(p_snapshot);    delete from ps_snapshot where snapshot_id = l_snapshot;   delete from ps_column where table_id = l_snapshot;   delete from ps_table where id = l_snapshot;    execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_update  on ' || p_table;   execute 'drop trigger if exists ps_' || p_table || '_after_delete  on ' || p_table;      perform 1   from ( select 1          from   ps_range_partition          where  table_id = l_table          union  all          select 1          from   ps_snapshot          where  table_id = l_table ) a;   if not FOUND then      execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';      execute 'drop function if exists ps_' || p_table || '_raise_trigger()  cascade';      execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';      execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';   else      perform ps_trigger_regenerate(l_table);       l_sql :=      'create trigger ps_' || p_table || '_before_insert ' ||     'before insert on ' || p_table || ' for each row ' ||     'execute procedure ps_' || p_table || '_insert_trigger()';      execute l_sql;       perform 1      from   ps_snapshot a, ps_column b      where  b.table_id = a.snapshot_id and a.table_id = l_table      and    b.type_name in ('min', 'max');      if FOUND then         l_sql :=         'create trigger ps_' || p_table || '_after_update ' ||        'after update on ' || p_table || ' for each row ' ||        'execute procedure ps_' || p_table || '_raise_trigger()';         execute l_sql;         l_sql :=         'create trigger ps_' || p_table || '_after_delete ' ||        'after delete on ' || p_table || ' for each row ' ||        'execute procedure ps_' || p_table || '_raise_trigger()';         execute l_sql;      else         l_sql :=         'create trigger ps_' || p_table || '_after_update ' ||        'after update on ' || p_table || ' for each row ' ||        'execute procedure ps_' || p_table || '_update_trigger()';         execute l_sql;         l_sql :=         'create trigger ps_' || p_table || '_after_delete ' ||        'after delete on ' || p_table || ' for each row ' ||        'execute procedure ps_' || p_table || '_delete_trigger()';         execute l_sql;      end if;   end if;    execute 'drop table if exists ' || p_snapshot; end; $$ language plpgsql; 

Здесь тоже нет ничего принципиально нового и единственное, о чем хотелось бы заметить, это то, что, в случае использования агрегатов ‘min’ или ‘max’, при создании триггеров, используется функция ps_TABLE_raise_trigger(), запрещающая удаления и изменения в таблице, по которой построен snapshot. Это сделано потому, что я не смог придумать адекватную по производительности реализацию обновления этих агрегатов при выполнении операторов update и delete в исходной таблице.

Посмотрим, как все это работает. Создадим тестовую таблицу:

create sequence test_seq;  create table test (   id            bigint         default nextval('test_seq') not null,   event_time    timestamp      not null,   customer_id   bigint         not null,   value         bigint         not null,   primary key(id) ); 

Теперь, для добавления секции, достаточно выполнить следующий запрос:

select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD')) 

В результате, будет создана унаследованная таблица test_20130501, в которую будут автоматически попадать все записи за май месяц.

Для удаления секции, можно выполнить следующий запрос:

select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD')) 

Создание snapshot несколько сложнее, поскольку предварительно требуется определить интересующие нас столбцы:

select ps_add_snapshot_column('test_month', 'customer_id', 'key') select ps_add_snapshot_column('test_month', 'event_time', 'date') select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum') select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt') select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max') select ps_add_snapshot('test', 'test_month', 'month') 

В результате, будет создана автоматически обновляемая таблица, на основании следующего запроса:

select customer_id, date_trunc('month', event_time),        sum(value) as value_sum,        count(value) as value_cnt,        max(value) as value_max from   test group by customer_id, date_trunc('month', event_time) 

Удалить snapshot, можно выполнив следующий запрос:

select ps_del_snapshot('test_month') 

На этом, на сегодня, все. Скрипты можно забрать на GitHub.

ссылка на оригинал статьи http://habrahabr.ru/post/177957/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *