Переливаем таблицы БД между средами: быстро и без боли на примере MS SQL

от автора

Необходимость переноса данных из одной среды в другую — задача, с которой разработчики сталкиваются достаточно часто. Например, для отправки таблиц из прода в среды для тестирования. Вместе с тем, такая «перезаливка» таблиц нередко превращается в настоящий квест, по ходу которого нужно не только гарантировать сохранность данных, но и исключить ошибки, связанные с человеческим фактором. Поэтому лучшей практикой является автоматизация переноса.

Меня зовут Евгений Грибков. Я ведущий программист в центре технологий VK. В этой статье мы рассмотрим одно из возможных решений создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL.

Типовой алгоритм перезаливки таблиц в базе данных

Обычно в ситуациях, когда нужно перезалить данные в таблицах БД из одной среды в другую, применяют шаблонный алгоритм восстановления базы:

  • делают резервную копию с прода;

  • восстанавливают бэкап на отдельной изолированной, промежуточной среде;

  • производят обезличивание или изменение персональных данных;

  • создают бэкап с полученной базы данных;

  • восстанавливают резервную копию в нужные среды (например, для разработки и тестирования).

Обычно весь описанный алгоритм автоматизирован и работает по расписанию (например, раз в сутки ночью или раз в неделю на выходных).

Но у такого подхода есть существенный недостаток: каждый раз восстанавливать всю базу данных во все среды — весьма длительный и очень дорогой в плане занимаемого места процесс. Поэтому в алгоритме нередко предусматривают чистки как исторических данных до определенного момента времени (например, всё, что старее одного календарного года), так и по определённым критериям. Таким образом достигается уменьшение объема БД в десятки, а то и в сотни раз.

Также надо учитывать, что в средах может быть несколько БД, в которые надо периодически догружать данные. Причем желательно, чтобы была возможность делать это в моменты, когда БД не используется активно — например, раз в неделю ночью или каждую ночь.

Соответственно, в таких кейсах важна автоматизация, которая дает возможность перезаливать таблицы БД из одной среды в другую без ручного контроля и глобального вмешательства разработчиков.

Вариант реализации скрипта

Теперь перейдем от теории к практике. Рассмотрим один из вариантов создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL. При этом сразу оговоримся и примем условие, что обе БД на одном экземпляре СУБД — то есть, БД‑источник уже восстановлен в той же СУБД, в которой находится целевая БД.

Алгоритм работы такого скрипта будет следующим:

  • Отключить все ограничения.

  • Отключить все триггеры.

  • Для заданных таблиц сохранить все внешние ключи, после чего удалить их.

  • Произвести полную очистку заданных таблиц через команду TRUNCATE с последующим их заполнением данными.

  • Удалить битые данные.

  • Обновить статистики перезалитых выше таблиц.

  • Включить триггеры.

  • Восстановить внешние ключи.

  • Включить и перепроверить все ограничения.

Теперь приступим к реализации.

Для начала определим все необходимые переменные и временные таблицы
SET NOCOUNT OFF;  DECLARE @DB       VARCHAR(250) = QUOTENAME(DB_NAME()),         @DBMaster VARCHAR(255) = 'БД-мастер',                       @ERROR    VARCHAR(MAX);    DECLARE @HistoryLimited           bit = 1,           @table_name               nvarchar(255),           @is_identity              int = 0,           @stm                      nvarchar(max) = '',           @cols                     nvarchar(max) = '',           @IsNOTInsert              bit,           @schema_name              nvarchar(255),           @col_name_identity        nvarchar(255),           @referencing_object       nvarchar(255),           @referenced_object        nvarchar(255),           @constraint_name          nvarchar(255),           @referencing_columns      nvarchar(max),           @referenced_columns       nvarchar(max),           @rules                    nvarchar(max),           @key_cols                 nvarchar(max),           @StartMoment              DATETIME2,           @FinishMoment             DATETIME2,           @delete_referential_action INT,           @update_referential_action INT,           @max_row_insert           INT = 100000,           @isClearTableFKs          BIT = 1,           @RowCount                 BIGINT = 1,           @WhileDelCount            INT = 0;           ;    DECLARE @cnt TABLE (cnt BIGINT NOT NULL);    DROP TABLE IF EXISTS #tbl_res;    CREATE TABLE #tbl_res (                            SchName       NVARCHAR(255) NOT NULL,                            TblName       NVARCHAR(255) NOT NULL,                            StartMoment   DATETIME2     NOT NULL,                            FinishMoment  DATETIME2     NOT NULL,                            Cnt           BIGINT        NOT NULL,                            ErrorMsg      NVARCHAR(MAX) NULL                          );

Здесь определены переменные, которые будут использоваться далее в скрипте, а также табличная переменная, в которой будет записан итог работы перезаливки данных с таймингом.

Далее отключаем все ограничения в БД. Это можно сделать с помощью следующей команды: EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"

Отключаем все триггеры БД
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'      + CHAR(13)      , SCHEMA_NAME(b.[schema_id])      , OBJECT_NAME(t.parent_id)      , t.[Name]) AS stm         FROM sys.triggers          t         LEFT JOIN sys.tables  b ON b.object_id = t.parent_id         WHERE t.is_disabled = 0           AND t.type_desc = 'SQL_TRIGGER'           AND OBJECT_NAME(t.parent_id) IS NOT NULL         ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,                  OBJECT_NAME(t.parent_id) ASC;      OPEN r_cursor_trigg_off;     FETCH NEXT FROM r_cursor_trigg_off     INTO @stm;      WHILE @@FETCH_STATUS = 0     BEGIN       PRINT @stm;        BEGIN TRY         BEGIN TRANSACTION;           EXEC sys.sp_executesql @stmt = @stm;         COMMIT;       END TRY       BEGIN CATCH         SET @ERROR = ERROR_MESSAGE();         PRINT @ERROR;         IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION       END CATCH              FETCH NEXT FROM r_cursor_trigg_off       INTO @stm;     END      CLOSE r_cursor_trigg_off;     DEALLOCATE r_cursor_trigg_off;      SET @stm = '';      SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'                  + CHAR(13), t.[Name])     FROM sys.triggers          t     LEFT JOIN sys.tables  b ON b.[object_id] = t.parent_id        WHERE t.is_disabled = 0          AND t.[type_desc] = 'SQL_TRIGGER'          AND OBJECT_NAME(t.parent_id) IS NULL;      PRINT @stm;      BEGIN TRY       BEGIN TRANSACTION;         EXEC sys.sp_executesql @stmt = @stm;       COMMIT;     END TRY     BEGIN CATCH       SET @ERROR = ERROR_MESSAGE();       PRINT @ERROR;       IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION     END CATCH

Далее собираем метаданные по таблицам, с которыми будем работать
DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls (                     [name]      NVARCHAR(255) NOT NULL,                     sch_name    NVARCHAR(255) NOT NULL,                     IsNOTInsert BIT           NOT NULL                    );  INSERT INTO #tbls (                     [name],                     sch_name,                     IsNOTInsert                   )   SELECT t.[name],          SCHEMA_NAME(t.[schema_id]) AS sch_name,          --задается правило, по которому определяем          --нужно ли после очистки наполнять данными таблицу или нет          --по умолчанию нужно (0-да, 1-нет)          0 AS IsNOTInsert       FROM sys.tables AS t       --в фильтре задаем какие таблицы брать в расчет       --(в нашем случае какие не брать в расчёт)       WHERE t.[name] NOT LIKE 'unused%'         AND t.[name] NOT LIKE 'removed%'         AND t.[name] NOT LIKE 'migrated%'         AND t.[name] NOT LIKE 'migration%'         AND t.[name] NOT LIKE 'sysdiag%'         AND t.[name] NOT LIKE 'test%'         AND t.[name] NOT LIKE 'tmp%'         AND t.[name] NOT LIKE '%_cache'         AND t.[name] NOT IN ('FKs');

Теперь соберем все внешние ключи полученных таблиц, сохраним их в таблице dbo.FKs, затем — удалим
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t                          WHERE t.[name]= 'FKs'                            AND t.[schema_id] = SCHEMA_ID('dbo'))     BEGIN       CREATE TABLE dbo.FKs (                           referencing_object      NVARCHAR(255) NOT NULL,                           constraint_column_id    INT NOT NULL,                           referencing_column_name NVARCHAR(255) NOT NULL,                           referenced_object       NVARCHAR(255) NOT NULL,                           referenced_column_name  NVARCHAR(255) NOT NULL,                           constraint_name         NVARCHAR(255) NOT NULL,                           delete_referential_action INT NOT NULL,                           update_referential_action INT NOT NULL                         );     END     ELSE IF (@isClearTableFKs = 1)     BEGIN       TRUNCATE TABLE dbo.FKs;     END      INSERT INTO dbo.FKs (                   referencing_object,                   constraint_column_id,                   referencing_column_name,                   referenced_object,                   referenced_column_name,                   constraint_name,                   delete_referential_action,                   update_referential_action                  )     SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['             , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,     FK.constraint_column_id,     CONCAT('['     , COL_NAME(FK.parent_object_id, FK.parent_column_id)            , ']') AS referencing_column_name,     CONCAT('['     , SCHEMA_NAME(R.[schema_id]), '].['     , OBJECT_NAME(FK.referenced_object_id)         , ']') AS referenced_object,     CONCAT('['            , COL_NAME(FK.referenced_object_id, FK.referenced_column_id)            , ']') AS referenced_column_name,     CONCAT('['            , OBJECT_NAME(FK.constraint_object_id)            , ']') AS constraint_name,     FKK.delete_referential_action,     FKK.update_referential_action     FROM sys.foreign_key_columns AS FK     INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]                                                  = FK.constraint_object_id     INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id     INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id     WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0                       WHERE t0.constraint_name = CONCAT('['                             , OBJECT_NAME(FK.constraint_object_id), ']'));      DELETE FROM trg     FROM dbo.FKs AS trg     WHERE NOT EXISTS (                       SELECT 1                       FROM #tbls AS src                       WHERE trg.referencing_object = CONCAT('['                                     , src.sch_name, '].[', src.[name], ']')                          OR trg.referenced_object  = CONCAT('['                                     , src.sch_name, '].[', src.[name], ']')                      )      DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR         SELECT t.referencing_object,                t.referenced_object,                t.constraint_name         FROM dbo.FKs AS t         WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK                        WHERE t.constraint_name = CONCAT('['                                     , OBJECT_NAME(FK.constraint_object_id)                                                   , ']'))         GROUP BY t.referencing_object,                  t.referenced_object,                  t.constraint_name;      OPEN r_cursor_fk_drop;     FETCH NEXT FROM r_cursor_fk_drop     INTO @referencing_object,          @referenced_object,          @constraint_name      WHILE @@FETCH_STATUS = 0     BEGIN         SET @stm = CONCAT('ALTER TABLE ', @referencing_object                         , ' DROP CONSTRAINT ', @constraint_name, ';');          PRINT @stm;          BEGIN TRY           BEGIN TRANSACTION;             EXEC sys.sp_executesql @stmt = @stm;           COMMIT;         END TRY         BEGIN CATCH           SET @ERROR = ERROR_MESSAGE();           PRINT @ERROR;           IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION         END CATCH          FETCH NEXT FROM r_cursor_fk_drop         INTO @referencing_object,              @referenced_object,              @constraint_name;     END      CLOSE r_cursor_fk_drop;     DEALLOCATE r_cursor_fk_drop;

Следом перейдем к фрагменту кода, который отвечает за очистку и наполнение данными выбранных ранее таблиц
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name],        t.sch_name,        t.IsNOTInsert   FROM #tbls AS t   ORDER BY t.[name] ASC;  OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name,      @schema_name,      @IsNOTInsert;  WHILE @@FETCH_STATUS = 0 BEGIN   SET @cols = '';   SET @is_identity = 0;   SET @col_name_identity = NULL;    SET @stm = CONCAT('TRUNCATE TABLE ', @DB                     , '.[', @schema_name, '].[', @table_name, ']');    PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH    IF (@IsNOTInsert = 0)   BEGIN       SELECT @cols        = @cols + CASE WHEN @cols = ''                                THEN c.[name] ELSE ',' + c.name END,              @is_identity = @is_identity + c.is_identity,              @col_name_identity = CASE WHEN (c.is_identity = 1)                                         THEN c.[name]                                         ELSE @col_name_identity END         FROM sys.tables t,              sys.columns c         WHERE t.[object_id] = c.[object_id]           AND t.[name]      = @table_name           AND c.is_computed = 0;        SET @stm = '';        IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '             , @DB, '.[', @schema_name, '].[', @table_name, '] ON');        SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB                   , '.[', @schema_name, '].[', @table_name                   , '](', @cols, ') SELECT ', @cols                   , ' FROM [',@DBMaster,'].['                   , @schema_name, '].[', @table_name, '] WITH(NOLOCK) ');        --здесь можно задать ограничение на наполнение данными       IF @HistoryLimited = 1       BEGIN         IF @table_name LIKE '%History'             SET @stm = CONCAT(@stm         , ' WHERE ChangeDateTime > DATEADD   (month, -1, SYSDATETIME()) ');       END        IF @is_identity > 0 SET @stm = CONCAT(@stm          , ' SET IDENTITY_INSERT ', @DB          , '.[', @schema_name, '].[', @table_name, '] OFF');              IF @is_identity > 0 SET @stm = CONCAT(@stm          , ' DBCC CHECKIDENT ("', @table_name, '")');        SET @StartMoment = SYSDATETIME();       SET @ERROR = NULL;        PRINT @stm;        BEGIN TRY         BEGIN TRANSACTION;           EXEC sys.sp_executesql @stmt = @stm;         COMMIT;       END TRY       BEGIN CATCH         SET @ERROR = ERROR_MESSAGE();         PRINT @ERROR;         IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION       END CATCH        SET @FinishMoment = SYSDATETIME();        SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM '           , '[', @schema_name, '].[', @table_name, '] WITH (NOLOCK);');        DELETE FROM @cnt;        INSERT INTO @cnt (cnt)       EXEC sys.sp_executesql @stmt = @stm;        INSERT INTO #tbl_res (                 SchName,                 TblName,                 StartMoment,                 FinishMoment,                 Cnt,                 ErrorMsg                )       SELECT @schema_name,              @table_name,              @StartMoment,              @FinishMoment,              COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,              @ERROR;   END    FETCH NEXT FROM r_cursor   INTO @table_name,        @schema_name,        @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor;      

Обычно после этого фрагмента производятся какие‑то еще необходимые манипуляции с данными. Например, добавляются нужные пользователи и роли с правами в соответствующие таблицы БД.

Затем производится удаление битых данных, то есть тех, по которым нет для записи из одной таблицы соответствующей записи в другой таблице по внешнему ключу
WHILE (@RowCount > 0) BEGIN   SET @RowCount = 0;   SET @WhileDelCount += 1;    DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR   SELECT t.referencing_object,          t.referenced_object,          t.constraint_name,          STRING_AGG (CONCAT('(trg.', t.referencing_column_name          , '=src.', t.referenced_column_name, ')'), ' AND ')           WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,          STRING_AGG (CONCAT('(trg.', t.referencing_column_name          , ' IS NOT NULL)'), ' AND ')           WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols   FROM dbo.FKs AS t   GROUP BY t.referencing_object,            t.referenced_object,            t.constraint_name;  OPEN r_cursor_fk_corr; FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object    , @referenced_object    , @constraint_name    , @rules    , @key_cols;  WHILE @@FETCH_STATUS = 0 BEGIN     SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object     ,' AS trg WHERE ', @key_cols     , ' AND NOT EXISTS (SELECT 1 FROM ', @referenced_object,     ' AS src WITH (NOLOCK) WHERE ', @rules, ');');      PRINT @stm;      BEGIN TRY       BEGIN TRANSACTION;         EXEC sys.sp_executesql @stmt = @stm;          SET @RowCount += @@ROWCOUNT;       COMMIT;     END TRY     BEGIN CATCH       SET @ERROR = ERROR_MESSAGE();       PRINT @ERROR;       IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION     END CATCH      FETCH NEXT FROM r_cursor_fk_corr     INTO @referencing_object        , @referenced_object        , @constraint_name        , @rules        , @key_cols; END    CLOSE r_cursor_fk_corr;   DEALLOCATE r_cursor_fk_corr; END  PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);

Удаление неконсистентных данных происходит до тех пор, пока такие данные обнаруживаются. Это нужно, чтобы исключить ситуацию, когда была удалена запись из одной таблицы, но при этом была потеряна соответствующая связь в другой уже ранее обработанной таблице.

Далее обновляем статистики для рассматриваемых выше таблиц
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[' , t.sch_name, '].[', t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t;  OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm;  WHILE @@FETCH_STATUS = 0 BEGIN   PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH    FETCH NEXT FROM r_cursor_stat   INTO @stm END  CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat;

Теперь необходимо включить триггеры в БД
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'    + CHAR(13), SCHEMA_NAME(b.[schema_id])   , OBJECT_NAME(t.parent_id), t.[Name]) AS stm FROM sys.triggers          t LEFT JOIN sys.tables  b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1   AND t.[type_desc] = 'SQL_TRIGGER'   AND OBJECT_NAME(t.parent_id) IS NOT NULL  OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm;  WHILE @@FETCH_STATUS = 0 BEGIN   PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH    FETCH NEXT FROM r_cursor_trigg_on   INTO @stm; END  CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on;  SET @stm = '';  SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'    + CHAR(13), t.[Name])   FROM sys.triggers          t   WHERE t.is_disabled = 0     AND t.[type_desc] = 'SQL_TRIGGER'     AND OBJECT_NAME(t.parent_id) IS NULL;  PRINT @stm;  BEGIN TRY   BEGIN TRANSACTION;     EXEC sys.sp_executesql @stmt = @stm;   COMMIT; END TRY BEGIN CATCH   SET @ERROR = ERROR_MESSAGE();   PRINT @ERROR;   IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH

После этого восстанавливаем все ранее удаленные внешние ключи
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object,        t.referenced_object,        t.constraint_name,        STRING_AGG (t.referencing_column_name, ',')         WITHIN GROUP (ORDER BY t.constraint_column_id ASC)          AS referencing_columns,        STRING_AGG (t.referenced_column_name, ',')         WITHIN GROUP (ORDER BY t.constraint_column_id ASC)          AS referenced_columns,        t.delete_referential_action,        t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK  WHERE t.constraint_name = CONCAT('[', OBJECT_NAME(FK.constraint_object_id)                                  , ']')) GROUP BY t.referencing_object,          t.referenced_object,          t.constraint_name,          t.delete_referential_action,          t.update_referential_action;  OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object    , @referenced_object    , @constraint_name    , @referencing_columns    , @referenced_columns    , @delete_referential_action    , @update_referential_action;  WHILE @@FETCH_STATUS = 0 BEGIN     SET @stm = CONCAT('ALTER TABLE ', @referencing_object     ,' WITH CHECK ADD  CONSTRAINT ', @constraint_name,     ' FOREIGN KEY(', @referencing_columns, ') REFERENCES '     , @referenced_object, ' (', @referenced_columns, ') '     , CASE         WHEN @delete_referential_action = 1           THEN 'ON DELETE CASCADE '         WHEN @delete_referential_action = 2           THEN 'ON DELETE SET NULL '         ELSE ''       END     , CASE         WHEN @update_referential_action = 1           THEN 'ON UPDATE CASCADE '         WHEN @update_referential_action = 2           THEN 'ON UPDATE SET NULL '         ELSE ''       END     , '; '     , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '     , @constraint_name, '; ');      PRINT @stm;      BEGIN TRY       BEGIN TRANSACTION;         EXEC sys.sp_executesql @stmt = @stm;       COMMIT;     END TRY     BEGIN CATCH       SET @ERROR = ERROR_MESSAGE();       PRINT @ERROR;       IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION     END CATCH      FETCH NEXT FROM r_cursor_fk_recover     INTO @referencing_object        , @referenced_object        , @constraint_name        , @referencing_columns        , @referenced_columns        , @delete_referential_action        , @update_referential_action; END  CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover;

В конце запускаем проверку всех ограничений, используя следующую команду:

EXEC sys.sp_msforeachtable @commAND1="PRINT '?'", @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";

После делаем вывод статистики работы перезаливки данных в таблицы:

SELECT t.SchName,        t.TblName,        t.Cnt,        DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,        t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;
В итоге получаем полный скрипт
SET NOCOUNT OFF;  DECLARE @DB       VARCHAR(250) = QUOTENAME(DB_NAME()),         @DBMaster VARCHAR(255) = 'БД-мастер',                 @ERROR    VARCHAR(MAX);  DECLARE @HistoryLimited            bit = 1,         @table_name                nvarchar(255),         @is_identity               int = 0,         @stm                       nvarchar(max) = '',         @cols                      nvarchar(max) = '',         @IsNOTInsert               bit,         @schema_name               nvarchar(255),         @col_name_identity         nvarchar(255),         @referencing_object        nvarchar(255),         @referenced_object         nvarchar(255),         @constraint_name           nvarchar(255),         @referencing_columns       nvarchar(max),         @referenced_columns        nvarchar(max),         @rules                     nvarchar(max),         @key_cols                  nvarchar(max),         @StartMoment               DATETIME2,         @FinishMoment              DATETIME2,         @delete_referential_action INT,         @update_referential_action INT,         @max_row_insert            INT = 100000,         @isClearTableFKs           BIT = 1,         @RowCount                  BIGINT = 1,         @WhileDelCount             INT = 0;         ;  DECLARE @cnt TABLE (cnt BIGINT NOT NULL);  DROP TABLE IF EXISTS #tbl_res;  CREATE TABLE #tbl_res (                          SchName       NVARCHAR(255) NOT NULL,                          TblName       NVARCHAR(255) NOT NULL,                          StartMoment   DATETIME2     NOT NULL,                          FinishMoment  DATETIME2     NOT NULL,                          Cnt           BIGINT        NOT NULL,                          ErrorMsg      NVARCHAR(MAX) NULL                        );  EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL";  DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'    + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)   , t.[Name]) AS stm FROM sys.triggers          t LEFT JOIN sys.tables  b ON b.object_id = t.parent_id WHERE t.is_disabled = 0   AND t.type_desc = 'SQL_TRIGGER'   AND OBJECT_NAME(t.parent_id) IS NOT NULL ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,          OBJECT_NAME(t.parent_id) ASC;  OPEN r_cursor_trigg_off; FETCH NEXT FROM r_cursor_trigg_off INTO @stm;  WHILE @@FETCH_STATUS = 0 BEGIN   PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH      FETCH NEXT FROM r_cursor_trigg_off   INTO @stm; END  CLOSE r_cursor_trigg_off; DEALLOCATE r_cursor_trigg_off;  SET @stm = '';  SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'   + CHAR(13), t.[Name]) FROM sys.triggers          t LEFT JOIN sys.tables  b ON b.[object_id] = t.parent_id    WHERE t.is_disabled = 0      AND t.[type_desc] = 'SQL_TRIGGER'      AND OBJECT_NAME(t.parent_id) IS NULL;  PRINT @stm;  BEGIN TRY   BEGIN TRANSACTION;     EXEC sys.sp_executesql @stmt = @stm;   COMMIT; END TRY BEGIN CATCH   SET @ERROR = ERROR_MESSAGE();   PRINT @ERROR;   IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH  DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls (                     [name]      NVARCHAR(255) NOT NULL,                     sch_name    NVARCHAR(255) NOT NULL,                     IsNOTInsert BIT           NOT NULL                    );  INSERT INTO #tbls (                     [name],                     sch_name,                     IsNOTInsert                   )   SELECT t.[name],          SCHEMA_NAME(t.[schema_id]) AS sch_name,          --задаётся правило, по которому определяем           --нужно ли после очистки наполнять данными таблицу или нет          --по умолчанию нужно (0-да, 1-нет)          0 AS IsNOTInsert       FROM sys.tables AS t       --в фильтре задаем какие таблицы брать в расчет       --(в нашем случае какие не брать в расчет)       WHERE t.[name] NOT LIKE 'unused%'         AND t.[name] NOT LIKE 'removed%'         AND t.[name] NOT LIKE 'migrated%'         AND t.[name] NOT LIKE 'migration%'         AND t.[name] NOT LIKE 'sysdiag%'         AND t.[name] NOT LIKE 'test%'         AND t.[name] NOT LIKE 'tmp%'         AND t.[name] NOT LIKE '%_cache'         AND t.[name] NOT IN ('FKs');  IF NOT EXISTS (SELECT 1 FROM sys.tables AS t   WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo')) BEGIN   CREATE TABLE dbo.FKs (                       referencing_object      NVARCHAR(255) NOT NULL,                       constraint_column_id    INT NOT NULL,                       referencing_column_name NVARCHAR(255) NOT NULL,                       referenced_object       NVARCHAR(255) NOT NULL,                       referenced_column_name  NVARCHAR(255) NOT NULL,                       constraint_name         NVARCHAR(255) NOT NULL,                       delete_referential_action INT NOT NULL,                       update_referential_action INT NOT NULL                     ); END ELSE IF (@isClearTableFKs = 1) BEGIN   TRUNCATE TABLE dbo.FKs; END  INSERT INTO dbo.FKs (               referencing_object,               constraint_column_id,               referencing_column_name,               referenced_object,               referenced_column_name,               constraint_name,               delete_referential_action,               update_referential_action              ) SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['   , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,   FK.constraint_column_id, CONCAT('[', COL_NAME(FK.parent_object_id   , FK.parent_column_id), ']') AS referencing_column_name, CONCAT('[', SCHEMA_NAME(R.[schema_id]), '].['   , OBJECT_NAME(FK.referenced_object_id), ']') AS referenced_object, CONCAT('[', COL_NAME(FK.referenced_object_id   , FK.referenced_column_id), ']') AS referenced_column_name, CONCAT('[', OBJECT_NAME(FK.constraint_object_id)   , ']') AS constraint_name, FKK.delete_referential_action, FKK.update_referential_action FROM sys.foreign_key_columns AS FK INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]                                        = FK.constraint_object_id INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0    WHERE t0.constraint_name = CONCAT('['                             , OBJECT_NAME(FK.constraint_object_id), ']'));    DELETE FROM trg FROM dbo.FKs AS trg WHERE NOT EXISTS (                   SELECT 1                   FROM #tbls AS src                   WHERE trg.referencing_object = CONCAT('[', src.sch_name                           , '].[', src.[name], ']')                      OR trg.referenced_object  = CONCAT('[', src.sch_name                           , '].[', src.[name], ']')                  )  DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object,        t.referenced_object,        t.constraint_name FROM dbo.FKs AS t WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK    WHERE t.constraint_name = CONCAT('['                             , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object,          t.referenced_object,          t.constraint_name;  OPEN r_cursor_fk_drop; FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object,      @referenced_object,      @constraint_name  WHILE @@FETCH_STATUS = 0 BEGIN   SET @stm = CONCAT('ALTER TABLE ', @referencing_object     , ' DROP CONSTRAINT ', @constraint_name, ';');      PRINT @stm;      BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH      FETCH NEXT FROM r_cursor_fk_drop   INTO @referencing_object,        @referenced_object,        @constraint_name; END  CLOSE r_cursor_fk_drop; DEALLOCATE r_cursor_fk_drop;  DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name],        t.sch_name,        t.IsNOTInsert   FROM #tbls AS t   ORDER BY t.[name] ASC;  OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name,      @schema_name,      @IsNOTInsert;  WHILE @@FETCH_STATUS = 0 BEGIN   SET @cols = '';   SET @is_identity = 0;   SET @col_name_identity = NULL;      SET @stm = CONCAT('TRUNCATE TABLE ', @DB                     , '.[', @schema_name, '].[', @table_name, ']');      PRINT @stm;      BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH      IF (@IsNOTInsert = 0)   BEGIN     SELECT @cols        = @cols + CASE WHEN @cols = ''                    THEN c.[name] ELSE ',' + c.name END,            @is_identity = @is_identity + c.is_identity,            @col_name_identity = CASE WHEN (c.is_identity = 1)                      THEN c.[name] ELSE @col_name_identity END       FROM sys.tables t,            sys.columns c       WHERE t.[object_id] = c.[object_id]         AND t.[name]      = @table_name         AND c.is_computed = 0;      SET @stm = '';      IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '           , @DB, '.[', @schema_name, '].[', @table_name, '] ON');      SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB, '.['               , @schema_name, '].[', @table_name                       , '](', @cols, ') SELECT ', @cols                       , ' FROM [',@DBMaster,'].['                       , @schema_name, '].['                       , @table_name, '] WITH(NOLOCK) ');      --здесь можно задать ограничение на наполнение данными     IF @HistoryLimited = 1     BEGIN       IF @table_name LIKE '%History'           SET @stm = CONCAT(@stm         , ' WHERE ChangeDateTime > DATEADD   (month, -1, SYSDATETIME()) ');     END      IF @is_identity > 0 SET @stm = CONCAT(@stm, ' SET IDENTITY_INSERT '         , @DB, '.[', @schema_name, '].[', @table_name, '] OFF');     IF @is_identity > 0 SET @stm = CONCAT(@stm, ' DBCC CHECKIDENT ("'         , @table_name, '")');      SET @StartMoment = SYSDATETIME();     SET @ERROR = NULL;      PRINT @stm;      BEGIN TRY       BEGIN TRANSACTION;         EXEC sys.sp_executesql @stmt = @stm;       COMMIT;     END TRY     BEGIN CATCH       SET @ERROR = ERROR_MESSAGE();       PRINT @ERROR;       IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION     END CATCH      SET @FinishMoment = SYSDATETIME();      SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ', '[', @schema_name         , '].[', @table_name, '] WITH (NOLOCK);');      DELETE FROM @cnt;      INSERT INTO @cnt (cnt)     EXEC sys.sp_executesql @stmt = @stm;      INSERT INTO #tbl_res (               SchName,               TblName,               StartMoment,               FinishMoment,               Cnt,               ErrorMsg              )     SELECT @schema_name,            @table_name,            @StartMoment,            @FinishMoment,            COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,            @ERROR;   END    FETCH NEXT FROM r_cursor   INTO @table_name,        @schema_name,        @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor;  WHILE (@RowCount > 0) BEGIN   SET @RowCount = 0;   SET @WhileDelCount += 1;    DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR   SELECT t.referencing_object,          t.referenced_object,          t.constraint_name,          STRING_AGG (CONCAT('(trg.', t.referencing_column_name           , '=src.', t.referenced_column_name, ')'), ' AND ')            WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,          STRING_AGG (CONCAT('(trg.', t.referencing_column_name           , ' IS NOT NULL)'), ' AND ')            WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols   FROM dbo.FKs AS t   GROUP BY t.referencing_object,            t.referenced_object,            t.constraint_name;    OPEN r_cursor_fk_corr;   FETCH NEXT FROM r_cursor_fk_corr   INTO @referencing_object      , @referenced_object      , @constraint_name      , @rules      , @key_cols;    WHILE @@FETCH_STATUS = 0   BEGIN       SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object         ,' AS trg WHERE ', @key_cols, ' AND NOT EXISTS (SELECT 1 FROM '         , @referenced_object,       ' AS src WITH (NOLOCK) WHERE ', @rules, ');');        PRINT @stm;        BEGIN TRY         BEGIN TRANSACTION;           EXEC sys.sp_executesql @stmt = @stm;            SET @RowCount += @@ROWCOUNT;         COMMIT;       END TRY       BEGIN CATCH         SET @ERROR = ERROR_MESSAGE();         PRINT @ERROR;         IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION       END CATCH        FETCH NEXT FROM r_cursor_fk_corr       INTO @referencing_object          , @referenced_object          , @constraint_name          , @rules          , @key_cols;   END    CLOSE r_cursor_fk_corr;   DEALLOCATE r_cursor_fk_corr; END  PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);  DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[', t.sch_name, '].['   , t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t;  OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm;  WHILE @@FETCH_STATUS = 0 BEGIN   PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH    FETCH NEXT FROM r_cursor_stat   INTO @stm END  CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat;  DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'    + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)                      , t.[Name]) AS stm FROM sys.triggers          t LEFT JOIN sys.tables  b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1   AND t.[type_desc] = 'SQL_TRIGGER'   AND OBJECT_NAME(t.parent_id) IS NOT NULL  OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm;  WHILE @@FETCH_STATUS = 0 BEGIN   PRINT @stm;    BEGIN TRY     BEGIN TRANSACTION;       EXEC sys.sp_executesql @stmt = @stm;     COMMIT;   END TRY   BEGIN CATCH     SET @ERROR = ERROR_MESSAGE();     PRINT @ERROR;     IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION   END CATCH    FETCH NEXT FROM r_cursor_trigg_on   INTO @stm; END  CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on;  SET @stm = '';  SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'                               + CHAR(13), t.[Name])   FROM sys.triggers          t   WHERE t.is_disabled = 0     AND t.[type_desc] = 'SQL_TRIGGER'     AND OBJECT_NAME(t.parent_id) IS NULL;  PRINT @stm;  BEGIN TRY   BEGIN TRANSACTION;     EXEC sys.sp_executesql @stmt = @stm;   COMMIT; END TRY BEGIN CATCH   SET @ERROR = ERROR_MESSAGE();   PRINT @ERROR;   IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH  DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object,        t.referenced_object,        t.constraint_name,        STRING_AGG (t.referencing_column_name, ',')          WITHIN GROUP (ORDER BY t.constraint_column_id ASC)          AS referencing_columns,        STRING_AGG (t.referenced_column_name, ',')          WITHIN GROUP (ORDER BY t.constraint_column_id ASC)          AS referenced_columns,        t.delete_referential_action,        t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK    WHERE t.constraint_name = CONCAT('['                           , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object,          t.referenced_object,          t.constraint_name,          t.delete_referential_action,          t.update_referential_action;  OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object    , @referenced_object    , @constraint_name    , @referencing_columns    , @referenced_columns    , @delete_referential_action    , @update_referential_action;  WHILE @@FETCH_STATUS = 0 BEGIN     SET @stm = CONCAT('ALTER TABLE ', @referencing_object                       ,' WITH CHECK ADD  CONSTRAINT ', @constraint_name,     ' FOREIGN KEY(', @referencing_columns, ') REFERENCES '                       , @referenced_object, ' (', @referenced_columns, ') '     , CASE         WHEN @delete_referential_action = 1           THEN 'ON DELETE CASCADE '         WHEN @delete_referential_action = 2           THEN 'ON DELETE SET NULL '         ELSE ''       END     , CASE         WHEN @update_referential_action = 1           THEN 'ON UPDATE CASCADE '         WHEN @update_referential_action = 2           THEN 'ON UPDATE SET NULL '         ELSE ''       END     , '; '     , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '                       , @constraint_name, '; ');      PRINT @stm;      BEGIN TRY       BEGIN TRANSACTION;         EXEC sys.sp_executesql @stmt = @stm;       COMMIT;     END TRY     BEGIN CATCH       SET @ERROR = ERROR_MESSAGE();       PRINT @ERROR;       IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION     END CATCH      FETCH NEXT FROM r_cursor_fk_recover     INTO @referencing_object        , @referenced_object        , @constraint_name        , @referencing_columns        , @referenced_columns        , @delete_referential_action        , @update_referential_action; END  CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover;  EXEC sys.sp_msforeachtable @commAND1="PRINT '?'" , @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";  SELECT t.SchName,        t.TblName,        t.Cnt,        DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,        t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;

Для проверки скрипта мы прогнали его на тестовой БД:

Скрипт выполнил перенос данных без ошибок.

Важно и то, что при объеме данных в БД‑источнике около 100 ГБ (размер таблиц в строках до 100 млн записей), время выполнения всего скрипта составило 18 — 19 минут, то есть достигается высокая скорость перезаливки таблиц.

Компоненты скрипта

Теперь немного о подробностях реализации. В приведённом скрипте используется целый стек системных объектов:

  • sys.sp_msforeachtable — недокументированная хранимая процедура в SQL Server, которая позволяет итеративно применять команду T‑SQL к каждой таблице в текущей базе данных;

  • sys.triggers — системный объект, который содержит информацию о триггерах в БД;

  • sys.tables — системный объект, который содержит информацию о таблицах в БД;

  • sys.sp_executesql — системная хранимая процедура для выполнения инструкции Transact‑SQL или пакета, в том числе, созданных динамически;

  • sys.foreign_key_columns — системный объект, который содержит информацию о составе внешних ключей;

  • sys.foreign_keys — системный объект, который содержит информацию о внешних ключах;

  • sys.columns — системный объект, содержащий информацию о колонках.

Вместо выводов

Реализация перезаливки таблиц в базе данных — довольно скрупулезная задача, в которой без автоматизации легко столкнуться с «подводными камнями» и нерациональным расходованием ресурсов. Поэтому автоматизация — must have для любой системы, где надо перегонять данные между средами. Предложенный скрипт — один из способов такой автоматизации.

Безусловно, описанное решение — не «серебряная пуля». Но наш опыт и проведенные тесты показали, что оно вполне эффективно и надежно справляется с переносом данных между средами.


ссылка на оригинал статьи https://habr.com/ru/articles/874342/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *