Боремся с дубликатами

в 10:55, , рубрики: postgresql, дубликаты, метки: ,

Продолжая тему использования динамического SQL, я хочу рассказать об одном полезном инструменте, реализованном мной в рамках одного из текущих проектов. Речь пойдет о дубликатах в справочниках. Под дубликатами, в этой статье, я понимаю записи, внесенные в справочники повторно, например в результате орфографической ошибки при вводе наименования.

Суть предлагаемого мной подхода в том, чтобы дать возможность объявить любую запись справочника дубликатом уже существующей. В результате, дублирующая запись будет удалена, а все ссылки на нее будут исправлены таким образом, чтобы они стали ссылаться на правильную запись. Также, очень важно, предоставить возможность отката таких изменений, на случай, если они сделаны по ошибке.

Начнем с таблиц для хранения служебных данных:

Служебные таблицы

create table    mg_table (
  table_name    varchar(100)   not null,
  pk_name       varchar(100)   not null,
  primary key(name)
);

create sequence mg_action_seq;

create table    mg_action (
  id            bigint         default nextval('mg_action_seq') not null,
  table_name    varchar(100)   not null references mg_table(name),
  old_id        varchar(50)    not null,
  new_id        varchar(50)    not null,
  action_time   timestamp      default now() not null,
  primary key(id)
);

create sequence mg_action_detail_seq;

create table    mg_action_detail (
  id            bigint         default nextval('mg_action_detail_seq') not null,
  action_id     bigint         not null references mg_action(id),
  table_name    varchar(100)   not null,
  pk_name       varchar(100)   not null,
  column_name   varchar(100)   not null,
  obj_id        varchar(50)    not null,
  primary key(id)
);

Здесь, таблица mg_table содержит данные о таблицах, для которых поддерживается слияние дубликатов. Требование к таким таблицам единственное — первичный ключ должен состоять из одного числового или строкового столбца. Нам не придется беспокоиться об этой таблице, поскольку она будет заполняться автоматически. Таблицы mg_action и mg_action_detail будут содержать данные, необходимые для отката изменений.

Определим пару вспомогательных функций:

Вспомогательные функции

create or replace function mg_get_pk_column(in p_table varchar) returns varchar
as $$
declare
  l_pk  text;
  l_cn  int;
begin
  select max(f.name), count(*) as name into l_pk, l_cn
  from ( select ps_array_to_set(a.conkey) as nn
         from   pg_constraint a, pg_class b
         where  b.oid = a.conrelid
         and    a.contype = 'p'
         and    b.relname = lower(p_table) ) c, 
       ( select d.attname as name, d.attnum as nn
         from   pg_attribute d, pg_class e
         where  e.oid = d.attrelid
         and    e.relname = lower(p_table) ) f
  where  f.nn = c.nn;

  if l_cn <> 1 then
     raise EXCEPTION 'Can''t support composite PK';
  end if;

  return l_pk;
end;
$$ language plpgsql;

create or replace function mg_add_dict(in p_table varchar) returns void
as $$
declare
  l_pk  text;
  l_sql text;
begin
  l_pk := mg_get_pk_column(p_table);

  perform 1
  from mg_table where table_name = lower(p_table);
  if not FOUND then

     l_sql := 
    'create table mg_' || lower(p_table) || ' ' ||
    'as select * from ' || lower(p_table) || ' limit 0';
     execute l_sql;

     l_sql :=
    'alter table mg_' || lower(p_table) || ' ' ||
    'add primary key(' || l_pk || ')';
     execute l_sql;

     insert into mg_table(table_name, pk_name) values (lower(p_table), l_pk);
  end if;
end;
$$ language plpgsql;

Функция mg_get_pk_column выполняет известный нам по предыдущей статье запрос, возвращающий имя столбца первичного ключа, а также осуществляет проверку на то, что первичный ключ состоит из одного столбца.

Функция mg_add_dict, помимо заполнения mg_table, создает таблицу с префиксом 'mg_', в которой будут сохраняться удаленные дубликаты, на тот случай, если изменение понадобиться откатить. По своей структуре, эта таблица полностью аналогична исходной.

Переходим к самому интересному:

mg_merge

create or replace function mg_merge(in p_table varchar, in p_old varchar, in p_new varchar) returns void
as $$
declare
  l_action int;
  l_pk     text;
  l_sql    text;
  tabs     record;
begin
  perform mg_add_dict(p_table);

  select pk_name into l_pk
  from   mg_table where table_name = lower(p_table);

  l_action := nextval('mg_action_seq');
  insert into mg_action(id, table_name, old_id, new_id)
  values (l_action, p_table, p_old, p_new);

  l_sql := 
 'insert into mg_' || lower(p_table) || ' ' ||
 'select * from ' || lower(p_table) || ' ' ||
 'where ' || l_pk || ' = ''' || p_old || '''';
  execute l_sql;

  for tabs in
      select b.relname as table_name, 
             d.attname as column_name
      from   pg_constraint a, pg_class b, pg_class c,
             pg_attribute d
      where  a.contype = 'f'
      and    b.oid = a.conrelid
      and    c.oid = a.confrelid
      and    c.relname = lower(p_table)
      and    d.attrelid = b.oid
      and    a.conkey[1] = d.attnum
      loop
     l_sql := 
    'insert into mg_action_detail(action_id, table_name, column_name, obj_id, pk_name) ' ||
    'select ' || l_action || ', ''' || tabs.table_name || ''', ''' || 
     tabs.column_name || ''', id, ' ||
    '''' || mg_get_pk_column(tabs.table_name::varchar) || ''' ' ||
    'from ' || lower(tabs.table_name) || ' ' ||
    'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
     execute l_sql;

        l_sql :=
       'update ' || lower(tabs.table_name) || ' ' ||
       'set ' || lower(tabs.column_name) || ' = ''' || p_new || ''' ' ||
       'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
        execute l_sql;
      end loop;

  l_sql :=
 'delete from ' || lower(p_table) || ' where ' || l_pk || ' = ''' || p_old || '''';
  execute l_sql;
end;
$$ language plpgsql;

create or replace function mg_merge(in p_table varchar, in p_old bigint, in p_new bigint) 
returns void
as $$
declare
begin
  perform mg_merge(p_table, p_old::varchar, p_new::varchar);
end;
$$ language plpgsql;

Эта функция выполняет выполняет поиск всех таблиц, ссылающихся на p_table при помощи внешнего ключа и заменяет в них p_old на p_new, сохраняя данные, необходимые для отката измений. Поскольку, чаще всего, столбец первичного ключа будет числовым, для удобства, перегружена функция mg_merge(varchar, bigint, bigint).

Осталось разработать функцию отката изменений:

mg_undo

create or replace function mg_undo() returns void
as $$
declare
  l_action int;
  l_old    varchar(50);
  l_table  text;
  l_sql    text;
  tabs     record;
begin
  select max(id) into l_action
  from   mg_action;

  if l_action is null then
     raise EXCEPTION 'Can''t UNDO';
  end if;

  select table_name, old_id into l_table, l_old
  from   mg_action
  where  id = l_action;

  l_sql := 
 'insert into ' || l_table || ' ' ||
 'select * from mg_' || l_table || ' ' ||
 'where id = ''' || l_old || '''';
  execute l_sql;

  for tabs in
      select table_name,
             pk_name,
             column_name
      from   mg_action_detail
      where  action_id = l_action
      group  by table_name, pk_name, column_name
      loop
         l_sql := 
        'update ' || tabs.table_name || ' ' ||
        'set ' || tabs.column_name || ' = ''' || l_old || ''' ' ||
        'where ''*'' || ' || tabs.pk_name || ' in (' ||
        'select ''*'' || obj_id from mg_action_detail '||
        'where table_name = ''' || tabs.table_name || ''' ' ||
        'and action_id = ' || l_action || ') ';
         execute l_sql;
      end loop;

  l_sql := 
 'delete from mg_' || l_table || ' where id = ''' || l_old || '''';
  execute l_sql;

  delete from mg_action_detail where action_id = l_action;
  delete from mg_action where id = l_action;
end;
$$ language plpgsql;

Изменения будут откатываться в порядке строго противополложном их созданию. По этой причине, никакие аргументы для mg_undo передавать не требуется.

Посмотрим, как все это работает. Создадим справочные таблицы:

create sequence city_seq;

create table    city (
  id            bigint         default nextval('city_seq') not null,
  name          varchar(100)   not null,
  primary key(id)
);

create sequence street_seq;

create table    street (
  id            bigint         default nextval('street_seq') not null,
  city_id       bigint         not null references city(id),
  name          varchar(100)   not null,
  primary key(id)
);

create sequence address_seq;

create table    address (
  id            bigint         default nextval('address_seq') not null,
  street_id     bigint         not null references street(id),
  house         varchar(10)    not null,
  apartment     varchar(10)    not null,
  primary key(id)
);

… и наполним их тестовыми данными:

insert into city(id, name) values (1, 'Казань');

insert into street(id, city_id, name) values (1, 1, 'Победы');
insert into street(id, city_id, name) values (2, 1, 'Победы проспект');

insert into address(id, street_id, house, apartment) values (1, 1, '10', '1');
insert into address(id, street_id, house, apartment) values (2, 2, '10', '2');

Теперь, для того чтобы «слить» улицу 'Победы проспект' с улицей 'Победы', достаточно выполнить следующую команду:

select mg_merge('street', 2, 1);

Функция mg_undo(), как и говорилось выше, откатит изменения.

Автор: GlukKazan

Источник

Поделиться

* - обязательные к заполнению поля