Боремся с дубликатами

от автора

Продолжая тему использования динамического SQL, я хочу рассказать об одном полезном инструменте, реализованном мной в рамках одного из текущих проектов. Речь пойдет о дубликатах в справочниках. Под дубликатами, в этой статье, я понимаю записи, внесенные в справочники повторно, например в результате орфографической ошибки при вводе наименования.

Суть предлагаемого мной подхода в том, чтобы дать возможность объявить любую запись справочника дубликатом уже существующей. В результате, дублирующая запись будет удалена, а все ссылки на нее будут исправлены таким образом, чтобы они стали ссылаться на правильную запись. Также, очень важно, предоставить возможность отката таких изменений, на случай, если они сделаны по ошибке.

Начнем с таблиц для хранения служебных данных:

Служебные таблицы

create table    mg_table (   table_name    varchar(100)   not null,   pk_name       varchar(100)   not null,   primary key(name) );  create sequence mg_action_seq;  create table    mg_action (   id            bigint         default nextval('mg_action_seq') not null,   table_name    varchar(100)   not null references mg_table(name),   old_id        varchar(50)    not null,   new_id        varchar(50)    not null,   action_time   timestamp      default now() not null,   primary key(id) );  create sequence mg_action_detail_seq;  create table    mg_action_detail (   id            bigint         default nextval('mg_action_detail_seq') not null,   action_id     bigint         not null references mg_action(id),   table_name    varchar(100)   not null,   pk_name       varchar(100)   not null,   column_name   varchar(100)   not null,   obj_id        varchar(50)    not null,   primary key(id) ); 

Здесь, таблица mg_table содержит данные о таблицах, для которых поддерживается слияние дубликатов. Требование к таким таблицам единственное — первичный ключ должен состоять из одного числового или строкового столбца. Нам не придется беспокоиться об этой таблице, поскольку она будет заполняться автоматически. Таблицы mg_action и mg_action_detail будут содержать данные, необходимые для отката изменений.

Определим пару вспомогательных функций:

Вспомогательные функции

create or replace function mg_get_pk_column(in p_table varchar) returns varchar as $$ declare   l_pk  text;   l_cn  int; begin   select max(f.name), count(*) as name into l_pk, l_cn   from ( select ps_array_to_set(a.conkey) as nn          from   pg_constraint a, pg_class b          where  b.oid = a.conrelid          and    a.contype = 'p'          and    b.relname = lower(p_table) ) c,         ( select d.attname as name, d.attnum as nn          from   pg_attribute d, pg_class e          where  e.oid = d.attrelid          and    e.relname = lower(p_table) ) f   where  f.nn = c.nn;    if l_cn <> 1 then      raise EXCEPTION 'Can''t support composite PK';   end if;    return l_pk; end; $$ language plpgsql;  create or replace function mg_add_dict(in p_table varchar) returns void as $$ declare   l_pk  text;   l_sql text; begin   l_pk := mg_get_pk_column(p_table);    perform 1   from mg_table where table_name = lower(p_table);   if not FOUND then       l_sql :=      'create table mg_' || lower(p_table) || ' ' ||     'as select * from ' || lower(p_table) || ' limit 0';      execute l_sql;       l_sql :=     'alter table mg_' || lower(p_table) || ' ' ||     'add primary key(' || l_pk || ')';      execute l_sql;       insert into mg_table(table_name, pk_name) values (lower(p_table), l_pk);   end if; end; $$ language plpgsql; 

Функция mg_get_pk_column выполняет известный нам по предыдущей статье запрос, возвращающий имя столбца первичного ключа, а также осуществляет проверку на то, что первичный ключ состоит из одного столбца.

Функция mg_add_dict, помимо заполнения mg_table, создает таблицу с префиксом ‘mg_’, в которой будут сохраняться удаленные дубликаты, на тот случай, если изменение понадобиться откатить. По своей структуре, эта таблица полностью аналогична исходной.

Переходим к самому интересному:

mg_merge

create or replace function mg_merge(in p_table varchar, in p_old varchar, in p_new varchar) returns void as $$ declare   l_action int;   l_pk     text;   l_sql    text;   tabs     record; begin   perform mg_add_dict(p_table);    select pk_name into l_pk   from   mg_table where table_name = lower(p_table);    l_action := nextval('mg_action_seq');   insert into mg_action(id, table_name, old_id, new_id)   values (l_action, p_table, p_old, p_new);    l_sql :=   'insert into mg_' || lower(p_table) || ' ' ||  'select * from ' || lower(p_table) || ' ' ||  'where ' || l_pk || ' = ''' || p_old || '''';   execute l_sql;    for tabs in       select b.relname as table_name,               d.attname as column_name       from   pg_constraint a, pg_class b, pg_class c,              pg_attribute d       where  a.contype = 'f'       and    b.oid = a.conrelid       and    c.oid = a.confrelid       and    c.relname = lower(p_table)       and    d.attrelid = b.oid       and    a.conkey[1] = d.attnum       loop      l_sql :=      'insert into mg_action_detail(action_id, table_name, column_name, obj_id, pk_name) ' ||     'select ' || l_action || ', ''' || tabs.table_name || ''', ''' ||       tabs.column_name || ''', id, ' ||     '''' || mg_get_pk_column(tabs.table_name::varchar) || ''' ' ||     'from ' || lower(tabs.table_name) || ' ' ||     'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';      execute l_sql;          l_sql :=        'update ' || lower(tabs.table_name) || ' ' ||        'set ' || lower(tabs.column_name) || ' = ''' || p_new || ''' ' ||        'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';         execute l_sql;       end loop;    l_sql :=  'delete from ' || lower(p_table) || ' where ' || l_pk || ' = ''' || p_old || '''';   execute l_sql; end; $$ language plpgsql;  create or replace function mg_merge(in p_table varchar, in p_old bigint, in p_new bigint)  returns void as $$ declare begin   perform mg_merge(p_table, p_old::varchar, p_new::varchar); end; $$ language plpgsql; 

Эта функция выполняет выполняет поиск всех таблиц, ссылающихся на p_table при помощи внешнего ключа и заменяет в них p_old на p_new, сохраняя данные, необходимые для отката измений. Поскольку, чаще всего, столбец первичного ключа будет числовым, для удобства, перегружена функция mg_merge(varchar, bigint, bigint).

Осталось разработать функцию отката изменений:

mg_undo

create or replace function mg_undo() returns void as $$ declare   l_action int;   l_old    varchar(50);   l_table  text;   l_sql    text;   tabs     record; begin   select max(id) into l_action   from   mg_action;    if l_action is null then      raise EXCEPTION 'Can''t UNDO';   end if;    select table_name, old_id into l_table, l_old   from   mg_action   where  id = l_action;    l_sql :=   'insert into ' || l_table || ' ' ||  'select * from mg_' || l_table || ' ' ||  'where id = ''' || l_old || '''';   execute l_sql;    for tabs in       select table_name,              pk_name,              column_name       from   mg_action_detail       where  action_id = l_action       group  by table_name, pk_name, column_name       loop          l_sql :=          'update ' || tabs.table_name || ' ' ||         'set ' || tabs.column_name || ' = ''' || l_old || ''' ' ||         'where ''*'' || ' || tabs.pk_name || ' in (' ||         'select ''*'' || obj_id from mg_action_detail '||         'where table_name = ''' || tabs.table_name || ''' ' ||         'and action_id = ' || l_action || ') ';          execute l_sql;       end loop;    l_sql :=   'delete from mg_' || l_table || ' where id = ''' || l_old || '''';   execute l_sql;    delete from mg_action_detail where action_id = l_action;   delete from mg_action where id = l_action; end; $$ language plpgsql; 

Изменения будут откатываться в порядке строго противополложном их созданию. По этой причине, никакие аргументы для mg_undo передавать не требуется.

Посмотрим, как все это работает. Создадим справочные таблицы:

create sequence city_seq;  create table    city (   id            bigint         default nextval('city_seq') not null,   name          varchar(100)   not null,   primary key(id) );  create sequence street_seq;  create table    street (   id            bigint         default nextval('street_seq') not null,   city_id       bigint         not null references city(id),   name          varchar(100)   not null,   primary key(id) );  create sequence address_seq;  create table    address (   id            bigint         default nextval('address_seq') not null,   street_id     bigint         not null references street(id),   house         varchar(10)    not null,   apartment     varchar(10)    not null,   primary key(id) ); 

… и наполним их тестовыми данными:

insert into city(id, name) values (1, 'Казань');  insert into street(id, city_id, name) values (1, 1, 'Победы'); insert into street(id, city_id, name) values (2, 1, 'Победы проспект');  insert into address(id, street_id, house, apartment) values (1, 1, '10', '1'); insert into address(id, street_id, house, apartment) values (2, 2, '10', '2'); 

Теперь, для того чтобы «слить» улицу ‘Победы проспект’ с улицей ‘Победы’, достаточно выполнить следующую команду:

select mg_merge('street', 2, 1); 

Функция mg_undo(), как и говорилось выше, откатит изменения.

ссылка на оригинал статьи http://habrahabr.ru/post/179789/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *