
Необходимость переноса данных из одной среды в другую — задача, с которой разработчики сталкиваются достаточно часто. Например, для отправки таблиц из прода в среды для тестирования. Вместе с тем, такая «перезаливка» таблиц нередко превращается в настоящий квест, по ходу которого нужно не только гарантировать сохранность данных, но и исключить ошибки, связанные с человеческим фактором. Поэтому лучшей практикой является автоматизация переноса.
Меня зовут Евгений Грибков. Я ведущий программист в центре технологий VK. В этой статье мы рассмотрим одно из возможных решений создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL.
Типовой алгоритм перезаливки таблиц в базе данных
Обычно в ситуациях, когда нужно перезалить данные в таблицах БД из одной среды в другую, применяют шаблонный алгоритм восстановления базы:
-
делают резервную копию с прода;
-
восстанавливают бэкап на отдельной изолированной, промежуточной среде;
-
производят обезличивание или изменение персональных данных;
-
создают бэкап с полученной базы данных;
-
восстанавливают резервную копию в нужные среды (например, для разработки и тестирования).
Обычно весь описанный алгоритм автоматизирован и работает по расписанию (например, раз в сутки ночью или раз в неделю на выходных).
Но у такого подхода есть существенный недостаток: каждый раз восстанавливать всю базу данных во все среды — весьма длительный и очень дорогой в плане занимаемого места процесс. Поэтому в алгоритме нередко предусматривают чистки как исторических данных до определенного момента времени (например, всё, что старее одного календарного года), так и по определённым критериям. Таким образом достигается уменьшение объема БД в десятки, а то и в сотни раз.
Также надо учитывать, что в средах может быть несколько БД, в которые надо периодически догружать данные. Причем желательно, чтобы была возможность делать это в моменты, когда БД не используется активно — например, раз в неделю ночью или каждую ночь.
Соответственно, в таких кейсах важна автоматизация, которая дает возможность перезаливать таблицы БД из одной среды в другую без ручного контроля и глобального вмешательства разработчиков.
Вариант реализации скрипта
Теперь перейдем от теории к практике. Рассмотрим один из вариантов создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL. При этом сразу оговоримся и примем условие, что обе БД на одном экземпляре СУБД — то есть, БД‑источник уже восстановлен в той же СУБД, в которой находится целевая БД.
Алгоритм работы такого скрипта будет следующим:
-
Отключить все ограничения.
-
Отключить все триггеры.
-
Для заданных таблиц сохранить все внешние ключи, после чего удалить их.
-
Произвести полную очистку заданных таблиц через команду
TRUNCATEс последующим их заполнением данными. -
Удалить битые данные.
-
Обновить статистики перезалитых выше таблиц.
-
Включить триггеры.
-
Восстановить внешние ключи.
-
Включить и перепроверить все ограничения.
Теперь приступим к реализации.
Для начала определим все необходимые переменные и временные таблицы
SET NOCOUNT OFF; DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()), @DBMaster VARCHAR(255) = 'БД-мастер', @ERROR VARCHAR(MAX); DECLARE @HistoryLimited bit = 1, @table_name nvarchar(255), @is_identity int = 0, @stm nvarchar(max) = '', @cols nvarchar(max) = '', @IsNOTInsert bit, @schema_name nvarchar(255), @col_name_identity nvarchar(255), @referencing_object nvarchar(255), @referenced_object nvarchar(255), @constraint_name nvarchar(255), @referencing_columns nvarchar(max), @referenced_columns nvarchar(max), @rules nvarchar(max), @key_cols nvarchar(max), @StartMoment DATETIME2, @FinishMoment DATETIME2, @delete_referential_action INT, @update_referential_action INT, @max_row_insert INT = 100000, @isClearTableFKs BIT = 1, @RowCount BIGINT = 1, @WhileDelCount INT = 0; ; DECLARE @cnt TABLE (cnt BIGINT NOT NULL); DROP TABLE IF EXISTS #tbl_res; CREATE TABLE #tbl_res ( SchName NVARCHAR(255) NOT NULL, TblName NVARCHAR(255) NOT NULL, StartMoment DATETIME2 NOT NULL, FinishMoment DATETIME2 NOT NULL, Cnt BIGINT NOT NULL, ErrorMsg NVARCHAR(MAX) NULL );
Здесь определены переменные, которые будут использоваться далее в скрипте, а также табличная переменная, в которой будет записан итог работы перезаливки данных с таймингом.
Далее отключаем все ограничения в БД. Это можно сделать с помощью следующей команды: EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"
Отключаем все триггеры БД
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];' + CHAR(13) , SCHEMA_NAME(b.[schema_id]) , OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.object_id = t.parent_id WHERE t.is_disabled = 0 AND t.type_desc = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL ORDER BY SCHEMA_NAME(b.[schema_id]) ASC, OBJECT_NAME(t.parent_id) ASC; OPEN r_cursor_trigg_off; FETCH NEXT FROM r_cursor_trigg_off INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_off INTO @stm; END CLOSE r_cursor_trigg_off; DEALLOCATE r_cursor_trigg_off; SET @stm = ''; SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH
Далее собираем метаданные по таблицам, с которыми будем работать
DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls ( [name] NVARCHAR(255) NOT NULL, sch_name NVARCHAR(255) NOT NULL, IsNOTInsert BIT NOT NULL ); INSERT INTO #tbls ( [name], sch_name, IsNOTInsert ) SELECT t.[name], SCHEMA_NAME(t.[schema_id]) AS sch_name, --задается правило, по которому определяем --нужно ли после очистки наполнять данными таблицу или нет --по умолчанию нужно (0-да, 1-нет) 0 AS IsNOTInsert FROM sys.tables AS t --в фильтре задаем какие таблицы брать в расчет --(в нашем случае какие не брать в расчёт) WHERE t.[name] NOT LIKE 'unused%' AND t.[name] NOT LIKE 'removed%' AND t.[name] NOT LIKE 'migrated%' AND t.[name] NOT LIKE 'migration%' AND t.[name] NOT LIKE 'sysdiag%' AND t.[name] NOT LIKE 'test%' AND t.[name] NOT LIKE 'tmp%' AND t.[name] NOT LIKE '%_cache' AND t.[name] NOT IN ('FKs');
Теперь соберем все внешние ключи полученных таблиц, сохраним их в таблице dbo.FKs, затем — удалим
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo')) BEGIN CREATE TABLE dbo.FKs ( referencing_object NVARCHAR(255) NOT NULL, constraint_column_id INT NOT NULL, referencing_column_name NVARCHAR(255) NOT NULL, referenced_object NVARCHAR(255) NOT NULL, referenced_column_name NVARCHAR(255) NOT NULL, constraint_name NVARCHAR(255) NOT NULL, delete_referential_action INT NOT NULL, update_referential_action INT NOT NULL ); END ELSE IF (@isClearTableFKs = 1) BEGIN TRUNCATE TABLE dbo.FKs; END INSERT INTO dbo.FKs ( referencing_object, constraint_column_id, referencing_column_name, referenced_object, referenced_column_name, constraint_name, delete_referential_action, update_referential_action ) SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].[' , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object, FK.constraint_column_id, CONCAT('[' , COL_NAME(FK.parent_object_id, FK.parent_column_id) , ']') AS referencing_column_name, CONCAT('[' , SCHEMA_NAME(R.[schema_id]), '].[' , OBJECT_NAME(FK.referenced_object_id) , ']') AS referenced_object, CONCAT('[' , COL_NAME(FK.referenced_object_id, FK.referenced_column_id) , ']') AS referenced_column_name, CONCAT('[' , OBJECT_NAME(FK.constraint_object_id) , ']') AS constraint_name, FKK.delete_referential_action, FKK.update_referential_action FROM sys.foreign_key_columns AS FK INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id] = FK.constraint_object_id INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0 WHERE t0.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')); DELETE FROM trg FROM dbo.FKs AS trg WHERE NOT EXISTS ( SELECT 1 FROM #tbls AS src WHERE trg.referencing_object = CONCAT('[' , src.sch_name, '].[', src.[name], ']') OR trg.referenced_object = CONCAT('[' , src.sch_name, '].[', src.[name], ']') ) DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name FROM dbo.FKs AS t WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id) , ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_drop; FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object , ' DROP CONSTRAINT ', @constraint_name, ';'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name; END CLOSE r_cursor_fk_drop; DEALLOCATE r_cursor_fk_drop;
Следом перейдем к фрагменту кода, который отвечает за очистку и наполнение данными выбранных ранее таблиц
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name], t.sch_name, t.IsNOTInsert FROM #tbls AS t ORDER BY t.[name] ASC; OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; WHILE @@FETCH_STATUS = 0 BEGIN SET @cols = ''; SET @is_identity = 0; SET @col_name_identity = NULL; SET @stm = CONCAT('TRUNCATE TABLE ', @DB , '.[', @schema_name, '].[', @table_name, ']'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH IF (@IsNOTInsert = 0) BEGIN SELECT @cols = @cols + CASE WHEN @cols = '' THEN c.[name] ELSE ',' + c.name END, @is_identity = @is_identity + c.is_identity, @col_name_identity = CASE WHEN (c.is_identity = 1) THEN c.[name] ELSE @col_name_identity END FROM sys.tables t, sys.columns c WHERE t.[object_id] = c.[object_id] AND t.[name] = @table_name AND c.is_computed = 0; SET @stm = ''; IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] ON'); SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB , '.[', @schema_name, '].[', @table_name , '](', @cols, ') SELECT ', @cols , ' FROM [',@DBMaster,'].[' , @schema_name, '].[', @table_name, '] WITH(NOLOCK) '); --здесь можно задать ограничение на наполнение данными IF @HistoryLimited = 1 BEGIN IF @table_name LIKE '%History' SET @stm = CONCAT(@stm , ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) '); END IF @is_identity > 0 SET @stm = CONCAT(@stm , ' SET IDENTITY_INSERT ', @DB , '.[', @schema_name, '].[', @table_name, '] OFF'); IF @is_identity > 0 SET @stm = CONCAT(@stm , ' DBCC CHECKIDENT ("', @table_name, '")'); SET @StartMoment = SYSDATETIME(); SET @ERROR = NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH SET @FinishMoment = SYSDATETIME(); SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ' , '[', @schema_name, '].[', @table_name, '] WITH (NOLOCK);'); DELETE FROM @cnt; INSERT INTO @cnt (cnt) EXEC sys.sp_executesql @stmt = @stm; INSERT INTO #tbl_res ( SchName, TblName, StartMoment, FinishMoment, Cnt, ErrorMsg ) SELECT @schema_name, @table_name, @StartMoment, @FinishMoment, COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt, @ERROR; END FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor;
Обычно после этого фрагмента производятся какие‑то еще необходимые манипуляции с данными. Например, добавляются нужные пользователи и роли с правами в соответствующие таблицы БД.
Затем производится удаление битых данных, то есть тех, по которым нет для записи из одной таблицы соответствующей записи в другой таблице по внешнему ключу
WHILE (@RowCount > 0) BEGIN SET @RowCount = 0; SET @WhileDelCount += 1; DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , '=src.', t.referenced_column_name, ')'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , ' IS NOT NULL)'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols FROM dbo.FKs AS t GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_corr; FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object ,' AS trg WHERE ', @key_cols , ' AND NOT EXISTS (SELECT 1 FROM ', @referenced_object, ' AS src WITH (NOLOCK) WHERE ', @rules, ');'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; SET @RowCount += @@ROWCOUNT; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; END CLOSE r_cursor_fk_corr; DEALLOCATE r_cursor_fk_corr; END PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
Удаление неконсистентных данных происходит до тех пор, пока такие данные обнаруживаются. Это нужно, чтобы исключить ситуацию, когда была удалена запись из одной таблицы, но при этом была потеряна соответствующая связь в другой уже ранее обработанной таблице.
Далее обновляем статистики для рассматриваемых выше таблиц
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[' , t.sch_name, '].[', t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t; OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_stat INTO @stm END CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat;
Теперь необходимо включить триггеры в БД
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]) , OBJECT_NAME(t.parent_id), t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_on INTO @stm; END CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on; SET @stm = ''; SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH
После этого восстанавливаем все ранее удаленные внешние ключи
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (t.referencing_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referencing_columns, STRING_AGG (t.referenced_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referenced_columns, t.delete_referential_action, t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[', OBJECT_NAME(FK.constraint_object_id) , ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name, t.delete_referential_action, t.update_referential_action; OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object ,' WITH CHECK ADD CONSTRAINT ', @constraint_name, ' FOREIGN KEY(', @referencing_columns, ') REFERENCES ' , @referenced_object, ' (', @referenced_columns, ') ' , CASE WHEN @delete_referential_action = 1 THEN 'ON DELETE CASCADE ' WHEN @delete_referential_action = 2 THEN 'ON DELETE SET NULL ' ELSE '' END , CASE WHEN @update_referential_action = 1 THEN 'ON UPDATE CASCADE ' WHEN @update_referential_action = 2 THEN 'ON UPDATE SET NULL ' ELSE '' END , '; ' , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT ' , @constraint_name, '; '); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; END CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover;
В конце запускаем проверку всех ограничений, используя следующую команду:
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'", @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
После делаем вывод статистики работы перезаливки данных в таблицы:
SELECT t.SchName, t.TblName, t.Cnt, DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec, t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;
В итоге получаем полный скрипт
SET NOCOUNT OFF; DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()), @DBMaster VARCHAR(255) = 'БД-мастер', @ERROR VARCHAR(MAX); DECLARE @HistoryLimited bit = 1, @table_name nvarchar(255), @is_identity int = 0, @stm nvarchar(max) = '', @cols nvarchar(max) = '', @IsNOTInsert bit, @schema_name nvarchar(255), @col_name_identity nvarchar(255), @referencing_object nvarchar(255), @referenced_object nvarchar(255), @constraint_name nvarchar(255), @referencing_columns nvarchar(max), @referenced_columns nvarchar(max), @rules nvarchar(max), @key_cols nvarchar(max), @StartMoment DATETIME2, @FinishMoment DATETIME2, @delete_referential_action INT, @update_referential_action INT, @max_row_insert INT = 100000, @isClearTableFKs BIT = 1, @RowCount BIGINT = 1, @WhileDelCount INT = 0; ; DECLARE @cnt TABLE (cnt BIGINT NOT NULL); DROP TABLE IF EXISTS #tbl_res; CREATE TABLE #tbl_res ( SchName NVARCHAR(255) NOT NULL, TblName NVARCHAR(255) NOT NULL, StartMoment DATETIME2 NOT NULL, FinishMoment DATETIME2 NOT NULL, Cnt BIGINT NOT NULL, ErrorMsg NVARCHAR(MAX) NULL ); EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"; DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.object_id = t.parent_id WHERE t.is_disabled = 0 AND t.type_desc = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL ORDER BY SCHEMA_NAME(b.[schema_id]) ASC, OBJECT_NAME(t.parent_id) ASC; OPEN r_cursor_trigg_off; FETCH NEXT FROM r_cursor_trigg_off INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_off INTO @stm; END CLOSE r_cursor_trigg_off; DEALLOCATE r_cursor_trigg_off; SET @stm = ''; SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls ( [name] NVARCHAR(255) NOT NULL, sch_name NVARCHAR(255) NOT NULL, IsNOTInsert BIT NOT NULL ); INSERT INTO #tbls ( [name], sch_name, IsNOTInsert ) SELECT t.[name], SCHEMA_NAME(t.[schema_id]) AS sch_name, --задаётся правило, по которому определяем --нужно ли после очистки наполнять данными таблицу или нет --по умолчанию нужно (0-да, 1-нет) 0 AS IsNOTInsert FROM sys.tables AS t --в фильтре задаем какие таблицы брать в расчет --(в нашем случае какие не брать в расчет) WHERE t.[name] NOT LIKE 'unused%' AND t.[name] NOT LIKE 'removed%' AND t.[name] NOT LIKE 'migrated%' AND t.[name] NOT LIKE 'migration%' AND t.[name] NOT LIKE 'sysdiag%' AND t.[name] NOT LIKE 'test%' AND t.[name] NOT LIKE 'tmp%' AND t.[name] NOT LIKE '%_cache' AND t.[name] NOT IN ('FKs'); IF NOT EXISTS (SELECT 1 FROM sys.tables AS t WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo')) BEGIN CREATE TABLE dbo.FKs ( referencing_object NVARCHAR(255) NOT NULL, constraint_column_id INT NOT NULL, referencing_column_name NVARCHAR(255) NOT NULL, referenced_object NVARCHAR(255) NOT NULL, referenced_column_name NVARCHAR(255) NOT NULL, constraint_name NVARCHAR(255) NOT NULL, delete_referential_action INT NOT NULL, update_referential_action INT NOT NULL ); END ELSE IF (@isClearTableFKs = 1) BEGIN TRUNCATE TABLE dbo.FKs; END INSERT INTO dbo.FKs ( referencing_object, constraint_column_id, referencing_column_name, referenced_object, referenced_column_name, constraint_name, delete_referential_action, update_referential_action ) SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].[' , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object, FK.constraint_column_id, CONCAT('[', COL_NAME(FK.parent_object_id , FK.parent_column_id), ']') AS referencing_column_name, CONCAT('[', SCHEMA_NAME(R.[schema_id]), '].[' , OBJECT_NAME(FK.referenced_object_id), ']') AS referenced_object, CONCAT('[', COL_NAME(FK.referenced_object_id , FK.referenced_column_id), ']') AS referenced_column_name, CONCAT('[', OBJECT_NAME(FK.constraint_object_id) , ']') AS constraint_name, FKK.delete_referential_action, FKK.update_referential_action FROM sys.foreign_key_columns AS FK INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id] = FK.constraint_object_id INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0 WHERE t0.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')); DELETE FROM trg FROM dbo.FKs AS trg WHERE NOT EXISTS ( SELECT 1 FROM #tbls AS src WHERE trg.referencing_object = CONCAT('[', src.sch_name , '].[', src.[name], ']') OR trg.referenced_object = CONCAT('[', src.sch_name , '].[', src.[name], ']') ) DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name FROM dbo.FKs AS t WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_drop; FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object , ' DROP CONSTRAINT ', @constraint_name, ';'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name; END CLOSE r_cursor_fk_drop; DEALLOCATE r_cursor_fk_drop; DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name], t.sch_name, t.IsNOTInsert FROM #tbls AS t ORDER BY t.[name] ASC; OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; WHILE @@FETCH_STATUS = 0 BEGIN SET @cols = ''; SET @is_identity = 0; SET @col_name_identity = NULL; SET @stm = CONCAT('TRUNCATE TABLE ', @DB , '.[', @schema_name, '].[', @table_name, ']'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH IF (@IsNOTInsert = 0) BEGIN SELECT @cols = @cols + CASE WHEN @cols = '' THEN c.[name] ELSE ',' + c.name END, @is_identity = @is_identity + c.is_identity, @col_name_identity = CASE WHEN (c.is_identity = 1) THEN c.[name] ELSE @col_name_identity END FROM sys.tables t, sys.columns c WHERE t.[object_id] = c.[object_id] AND t.[name] = @table_name AND c.is_computed = 0; SET @stm = ''; IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] ON'); SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB, '.[' , @schema_name, '].[', @table_name , '](', @cols, ') SELECT ', @cols , ' FROM [',@DBMaster,'].[' , @schema_name, '].[' , @table_name, '] WITH(NOLOCK) '); --здесь можно задать ограничение на наполнение данными IF @HistoryLimited = 1 BEGIN IF @table_name LIKE '%History' SET @stm = CONCAT(@stm , ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) '); END IF @is_identity > 0 SET @stm = CONCAT(@stm, ' SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] OFF'); IF @is_identity > 0 SET @stm = CONCAT(@stm, ' DBCC CHECKIDENT ("' , @table_name, '")'); SET @StartMoment = SYSDATETIME(); SET @ERROR = NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH SET @FinishMoment = SYSDATETIME(); SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ', '[', @schema_name , '].[', @table_name, '] WITH (NOLOCK);'); DELETE FROM @cnt; INSERT INTO @cnt (cnt) EXEC sys.sp_executesql @stmt = @stm; INSERT INTO #tbl_res ( SchName, TblName, StartMoment, FinishMoment, Cnt, ErrorMsg ) SELECT @schema_name, @table_name, @StartMoment, @FinishMoment, COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt, @ERROR; END FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor; WHILE (@RowCount > 0) BEGIN SET @RowCount = 0; SET @WhileDelCount += 1; DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , '=src.', t.referenced_column_name, ')'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , ' IS NOT NULL)'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols FROM dbo.FKs AS t GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_corr; FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object ,' AS trg WHERE ', @key_cols, ' AND NOT EXISTS (SELECT 1 FROM ' , @referenced_object, ' AS src WITH (NOLOCK) WHERE ', @rules, ');'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; SET @RowCount += @@ROWCOUNT; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; END CLOSE r_cursor_fk_corr; DEALLOCATE r_cursor_fk_corr; END PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount); DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[', t.sch_name, '].[' , t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t; OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_stat INTO @stm END CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat; DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_on INTO @stm; END CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on; SET @stm = ''; SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (t.referencing_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referencing_columns, STRING_AGG (t.referenced_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referenced_columns, t.delete_referential_action, t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name, t.delete_referential_action, t.update_referential_action; OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object ,' WITH CHECK ADD CONSTRAINT ', @constraint_name, ' FOREIGN KEY(', @referencing_columns, ') REFERENCES ' , @referenced_object, ' (', @referenced_columns, ') ' , CASE WHEN @delete_referential_action = 1 THEN 'ON DELETE CASCADE ' WHEN @delete_referential_action = 2 THEN 'ON DELETE SET NULL ' ELSE '' END , CASE WHEN @update_referential_action = 1 THEN 'ON UPDATE CASCADE ' WHEN @update_referential_action = 2 THEN 'ON UPDATE SET NULL ' ELSE '' END , '; ' , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT ' , @constraint_name, '; '); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; END CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover; EXEC sys.sp_msforeachtable @commAND1="PRINT '?'" , @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL"; SELECT t.SchName, t.TblName, t.Cnt, DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec, t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;
Для проверки скрипта мы прогнали его на тестовой БД:

Скрипт выполнил перенос данных без ошибок.
Важно и то, что при объеме данных в БД‑источнике около 100 ГБ (размер таблиц в строках до 100 млн записей), время выполнения всего скрипта составило 18 — 19 минут, то есть достигается высокая скорость перезаливки таблиц.
Компоненты скрипта
Теперь немного о подробностях реализации. В приведённом скрипте используется целый стек системных объектов:
-
sys.sp_msforeachtable — недокументированная хранимая процедура в SQL Server, которая позволяет итеративно применять команду T‑SQL к каждой таблице в текущей базе данных;
-
sys.triggers — системный объект, который содержит информацию о триггерах в БД;
-
sys.tables — системный объект, который содержит информацию о таблицах в БД;
-
sys.sp_executesql — системная хранимая процедура для выполнения инструкции Transact‑SQL или пакета, в том числе, созданных динамически;
-
sys.foreign_key_columns — системный объект, который содержит информацию о составе внешних ключей;
-
sys.foreign_keys — системный объект, который содержит информацию о внешних ключах;
-
sys.columns — системный объект, содержащий информацию о колонках.
Вместо выводов
Реализация перезаливки таблиц в базе данных — довольно скрупулезная задача, в которой без автоматизации легко столкнуться с «подводными камнями» и нерациональным расходованием ресурсов. Поэтому автоматизация — must have для любой системы, где надо перегонять данные между средами. Предложенный скрипт — один из способов такой автоматизации.
Безусловно, описанное решение — не «серебряная пуля». Но наш опыт и проведенные тесты показали, что оно вполне эффективно и надежно справляется с переносом данных между средами.
ссылка на оригинал статьи https://habr.com/ru/articles/874342/
Добавить комментарий