Автоматизация тестирования таблиц в Postgresql на SQL

от автора

Пример описываемой автоматизации

Привет.

Предположим, вам пришла задача на тестирование какой-либо функциональности, которая относится к бэкенду. Вы переходите к документу с требованиями и видите, помимо прочего, описание таблиц базы данных (БД). Примерно так это выглядит:

Таблица 1. Требования к таблице price в схеме public

Столбец

Ограничение

Тип

Not null

id

PK

int8

Да

price_value

UNIQUE price_unique

numeric(18, 6)

Да

model_id

UNIQUE price_unique, FOREIGN KEY price_models_fk со столбцом id таблицы models

int4

Да

comment

varchar(255)

Нет

Следовательно, один из тест-кейсов должен содержать проверки на соответствие метаданных этих таблиц ожидаемым значениям. К таким метаданным относятся состав и тип столбцов, ограничения для значений, наличие связей и т.д. В Postgresql есть неочевидная возможность проводить подобные проверки автоматически с помощью SQL-запроса.

К слову, эта статья будет интересна не только с точки зрения обзора SQL-запроса для автоматизации тестирования, но и с точки зрения того, что и как можно писать на SQL.

Возьмем для примера пару таблиц:

-- drop table public.models  create table public.models (     id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,     "name" varchar(50) not null,     manufacturer varchar(50) not null,     constraint models_pk primary key (id),     constraint models_unique unique (name, manufacturer) );
-- drop table public.price  create table public.price (     id int8 generated by default as identity( increment by 1 minvalue 1 maxvalue 9223372036854775807 start 1 cache 1 no cycle) not null,     price_value numeric(18,6) not null,     model_id int4 not null,     comment varchar(255) null,     constraint price_pk primary key (id),     constraint price_models_fk foreign key (model_id) references public.models(id),     constraint price_unique unique (price_value, model_id) );

Сосредоточимся на price. Таблица models нужна исключительно для того, чтобы добавить в price ограничение (constraint) по зависимому ключу (foreign key). SQL-запрос для автоматической проверки метаданных этой таблицы будет выглядеть так:

with     -- Настраиваем запрос: задаем название таблицы и название схемы     params(table_name, table_schema) as (         values (             'price',             'public'         )     ),     -- Описываем ожидаемые столбцы таблицы     required_columns(column_name, data_type, is_nullable, character_maximum_length) as (         values                         ('id',          'bigint',               'NO', null),             ('price_value', 'numeric',              'NO', 18.6),             ('model_id',    'integer',      'NO',null),             ('comment',     'character varying',    'YES',  255)     ),     -- Описываем ожидаемые ограничения     required_constraints(column_name, constraint_type) as (         values                         ('id',          'PRIMARY KEY'),             ('id',    'FOREIGN KEY'),             ('model_id',    'FOREIGN KEY'),             ('price_value', 'UNIQUE'),             ('model_id',    'UNIQUE')     ),     -- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов     columns_info as (         select             column_name, data_type, is_nullable,             coalesce(numeric_precision, 0)          as numeric_precision,             coalesce(numeric_scale, 0)              as numeric_scale,             coalesce(character_maximum_length, 0)   as character_maximum_length         from information_schema.columns         where             table_name       = (select table_name   from params)             and table_schema = (select table_schema from params)     ),     -- Проверяем существование таблицы и подсчитываем количество столбцов в ней     check_table_exist as (         select             case when count_all_fields < 1 then false else true end table_exists,             case when count_all_fields < 1 then 1 else 0 end table_exists_error,             count_all_fields         from (             select count (*) as count_all_fields             from columns_info         ) sq     ),     -- Сравниваем ожидаемый и текущий наборы атрибутов таблицы     fields_comparison as (         select t.*         from columns_info t         inner join required_columns r             on  t.column_name   = r.column_name             and t.data_type     = r.data_type             and t.is_nullable   = r.is_nullable             and (                 -- Сравниваем целую часть десятичных значений                 case                     when t.data_type = 'numeric'                     then t.numeric_precision = trunc(r.character_maximum_length::numeric)                 end                 and                 -- Сравниваем дробную часть десятичных значений                 case                     when t.data_type = 'numeric'                     then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))                     * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))                 end                 or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)            )     ),     -- Ищем лишние столбцы и считаем их количество     check_unexpected_fields as (         select             count (column_name) as count_unexpected_fields,             string_agg(column_name, ', ') as unexpected_fields         from (             select column_name             from columns_info             except             select column_name             from required_columns         ) sq     ),     -- Ищем недостающие столбцы и считаем их количество     check_missing_fields as (         select             count (column_name) as count_missing_fields,             string_agg(column_name, ', ') as missing_fields         from (             select column_name             from required_columns             except             select column_name             from columns_info         ) sq     ),     -- Ищем невалидные столбцы и считаем их количество     check_invalid_fields as (         select             count (column_name) as count_invalid_fields,             string_agg(column_name, ', ') as invalid_fields         from (             select column_name             from required_columns             except             select column_name             from fields_comparison             except             select string_to_table(missing_fields, ', ')             from check_missing_fields         ) sq     ),     -- Ищем все ограничения для таблицы     constraints_query as(         select             t1.constraint_type,             t2.column_name,             t3.column_name as foreign_column         from information_schema.table_constraints t1         left join information_schema.constraint_column_usage t2             on t1.constraint_name = t2.constraint_name         left join information_schema.key_column_usage as t3             on t1.constraint_name = t3.constraint_name             and t1.table_schema = t3.table_schema         where             t1.table_name            = (select table_name   from params)             and t1.constraint_schema = (select table_schema from params)             and t2.column_name is not null     ),     -- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)     union_foreign_ref_columns as (         select column_name, constraint_type         from constraints_query         union all         select foreign_column as column_name, constraint_type         from constraints_query     ),     -- Ищем лишние ограничения и считаем их количество     check_unexpected_constraints as (         select             count (column_name) as count_unexpected_constraints,             string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints         from (             select *             from union_foreign_ref_columns             except             select column_name, constraint_type             from required_constraints         ) sq     ),     -- Ищем недостающие ограничения и считаем их количество     check_missing_constraints as (         select             count (column_name) as count_missing_constraints,             string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints         from (             select column_name, constraint_type             from required_constraints             except             select *             from union_foreign_ref_columns         ) sq     ),     -- Собираем полученные данные     checks as (         select             -- Выводим всю информацию об ошибках и суммируем их количество             table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +             count_unexpected_constraints + count_missing_constraints as errors,             *         from        check_table_exist         cross join  check_unexpected_fields         cross join  check_missing_fields         cross join  check_invalid_fields         cross join  check_unexpected_constraints         cross join  check_missing_constraints ) select * from checks

Важное примечание: чтобы использовать этот запрос для какой-либо вашей таблицы, нужно внести изменения только в CTE params (названия вашей таблицы и схемы, где она хранится), required_columns (описание ожидаемых значений метаданных столбцов вашей таблицы) и required_constraints (описание ожидаемых ограничений для вашей таблицы). Остальной код не нужно менять.

img.2024-11-20 21.46.34.png

Возможность такой автоматической проверки обеспечивается наличием во всех БД Postgresql информационной схемы (information_schema). В Oracle, если не ошибаюсь, тоже такая есть, поэтому описываемый запрос может быть актуален и для других СУБД (полагаю, с корректировками).

Что показывает этот запрос?

Столбец

Расшифровка

errors

Сумма количества ошибок

table_exists

Существует ли таблица

table_exists_error

Счетчик для учета ошибки в случае отсутствия таблицы

count_all_fields

Счетчик всех столбцов таблицы

count_unexpected_fields

Счетчик лишних столбцов

unexpected_fields

Названия лишних столбцов через запятую

count_missing_fields

Счетчик недостающих столбцов

missing_fields

Названия недостающих столбцов через запятую

count_invalid_fields

Счетчик ожидаемых столбцов, в которых есть какие-либо ошибки

invalid_fields

Названия ожидаемых столбцов через запятую, в которых есть какие-либо ошибки

count_unexpected_constraints

Счетчик лишних ограничений

unexpected_constraints

Названия лишних ограничений с указанием типа ограничения в скобках через запятую

count_missing_constraints

Счетчик недостающих ограничений

missing_constraints

Названия недостающих ограничений с указанием типа ограничения в скобках через запятую

img.2024-11-27 19.40.40.png

Анализ запроса

В CTE paramsrequired_columns и required_constraints описываются входные данные. Это — ожидаемые датасеты значений. С params, думаю, все понятно — указываем название тестируемой таблицы и название схемы, где она хранится:

with     -- Настраиваем запрос: задаем название таблицы и название схемы     params(table_name, table_schema) as (         values (             'price',             'smartphones'         )     ) select * from params

Каждый CTE, о котором я пишу, буду оформлять с select’ом, который обращается к этому CTE. Весомой причины для этого нет — мне просто так удобнее 🙂
Этот код можно выполнить у себя локально. Он просто покажет значения внутри CTE:

img.2024-11-27 20.01.42.png

required_column рассмотрим подробнее:

with     -- Описываем ожидаемые столбцы таблицы     required_columns(column_name, data_type, is_nullable, character_maximum_length) as (         values                         ('id',          'bigint',               'NO',   null),             ('price_value', 'numeric',              'NO',   18.6),             ('model_id',    'integer',      'NO',   null),             ('comment',     'character varying',    'YES',  255)     ) select * from required_columns

Здесь описываются значения в 4-х столбцах:

  • column_name: название столбца в тестируемой таблице;

  • data_type: тип столбца. В текущей версии запроса указывать нужно изначальное название типа (character varying), а не синоним (varchar). Эти названия можно просмотреть здесь:
    https://www.postgresql.org/docs/8.1/datatype.html

  • is_nullable: допускается ли в качестве значения для столбца null;

  • character_maximum_length: длина строки. Иначе говоря, максимально допустимое количество символов (цифр) значения. Если ограничения нет — указывается null.

В CTE required_constraints содержатся ожидаемые ограничения по 2-м столбцам:

  • column_name: название столбца, к которому применяется ограничение;

  • constraint_type: тип ограничения.

Типы ограничений (constraint_type) бывают следующие:

  • CHECK: значения в столбце должны удовлетворять какому-либо заданному условию;

  • NOT NULL: название говорящее — значение в столбце не должно быть NULL;

  • UNIQUE: в столбце не допускается наличие

  • PRIMARY KEY: комбинация ограничений UNIQUE и NOT NULL;

  • FOREIGN KEY: значение в столбце у отдельно взятой записи должно быть таким же, как значение в столбце другой таблицы.

Ограничение NOT NULL проверяется отдельно в рамках изучения соответствия фактических метаданных значениям в CTE required_columns. Как для этого там есть столбец is_nullable.

Теперь рассмотрим код, который описан после required_constraints. До этого CTE включительно, как упоминал выше, указываются входные данные. В CTE columns_info происходит получение метаданных о столбцах проверяемой таблицы с помощью запроса к columns:

select * from information_schema.columns

Ограничим выборку только по тем метаданным, которые нужны для проверок. null’ы преобразуем в 0, чтобы значения можно было сравнивать. Также добавим фильтрацию (в where) по тестируемой таблице и по схеме, где она хранится:

select     column_name, data_type, is_nullable,     coalesce(numeric_precision, 0)          as numeric_precision,     coalesce(numeric_scale, 0)              as numeric_scale,     coalesce(character_maximum_length, 0)   as character_maximum_length from information_schema.columns where     table_name       = 'price'     and table_schema = 'public'

В select’е запроса:

Название столбца

Описание

column_name

Название столбца в тестируемой таблице

data_type

Тип столбца

is_nullable

Обязателен ли столбец для заполнения

numeric_precision

Используется только для столбцов числовых типов (numeric, integer и т.д.). Ограничение по количеству цифр целой части числа

numeric_scale

Используется только для для столбцов числовых типов. Ограничение по количеству цифр дробной части числа

character_maximum_length

ограничение по количеству символов для столбцов типа varchar (character varying)

Если у столбца тестируемой таблицы нет ограничений по количеству символов или цифр (например, при наличии типа boolean или jsonb), то в соответствующем столбце метаданных будет null.

Перенесем запрос в CTE и обратимся к записям, которые по нему возвращаются, через select:

with columns_info as (     select         column_name, data_type, is_nullable,         coalesce(numeric_precision, 0)          as numeric_precision,         coalesce(numeric_scale, 0)              as numeric_scale,         coalesce(character_maximum_length, 0)   as character_maximum_length     from information_schema.columns     where         table_name       = 'price'         and table_schema = 'public' ) select * from columns_info

Добавим код для проверки существования таблицы. Сделаем это через подсчет количества значений в запросе, которые мы описали ранее (меняем select * на select count (*) ):

with columns_info as (     select         column_name, data_type, is_nullable,         coalesce(numeric_precision, 0)          as numeric_precision,         coalesce(numeric_scale, 0)              as numeric_scale,         coalesce(character_maximum_length, 0)   as character_maximum_length     from information_schema.columns     where         table_name       = 'price'         and table_schema = 'public' ) select count (*) from columns_info

Логика проста: нет таблицы — нет и записей, которые возвращаются по значениям, указанным в фильтрах. Добавим сюда case для вывода boolean’а, который будет отображать статус проверки. А также счетчик, который будет показывать 1, если boolean — true и 0, если boolean — false. Он пригодится для подсчета суммы ошибок. Еще выведем фактическое количество столбцов в тестируемой таблице — это просто полезная информация. Все эти данные возьмем из select’а, описанного выше под CTE. Выполнять его будем через подзапрос:

with columns_info as (     select         column_name, data_type, is_nullable,         coalesce(numeric_precision, 0)          as numeric_precision,         coalesce(numeric_scale, 0)              as numeric_scale,         coalesce(character_maximum_length, 0)   as character_maximum_length     from information_schema.columns     where         table_name       = 'price'         and table_schema = 'public' ) select     case when count_all_fields < 1 then false   else true   end table_exists,     case when count_all_fields < 1 then 1       else 0      end table_exists_error,     count_all_fields from (     select count (*) as count_all_fields     from columns_info ) sq

В следующем блоке кода будем проводить сравнение — ожидаемых значений, описанных в CTE required_columns, и фактических значений, описанных в CTE columns_info. Уберем код для проверки существования таблицы в отдельный CTE check_table_exists. Также добавим 3 CTE с ожидаемыми данными, чтобы можно было проводить сравнение. Название таблицы (table_name) и название схемы (schema_name) для columns_info теперь будем получать через подзапросы. Пишем код дальше:

with     -- Настраиваем запрос: задаем название таблицы и название схемы     params(table_name, table_schema) as (         values (             'price',             'public'         )     ),     -- Описываем ожидаемые столбцы таблицы     required_columns(column_name, data_type, is_nullable, character_maximum_length) as (         values                         ('id',          'bigint',                       'NO', null),             ('price_value', 'numeric',            'NO', 18.6),             ('model_id',    'integer',  'NO',null),             ('comment',     'character varying',            'YES',255)     ),     -- Описываем ожидаемые ограничения     required_constraints(column_name, constraint_type) as (         values                         ('id',          'PRIMARY KEY'),             ('id',    'FOREIGN KEY'),             ('model_id',    'FOREIGN KEY'),             ('price_value', 'UNIQUE'),             ('model_id',    'UNIQUE')     ),     -- Находим информацию о столбцах тестируемой таблицы и добавляем обработку null'ов     columns_info as (         select             column_name, data_type, is_nullable,             coalesce(numeric_precision, 0)          as numeric_precision,             coalesce(numeric_scale, 0)              as numeric_scale,             coalesce(character_maximum_length, 0)   as character_maximum_length         from information_schema.columns         where             table_name       = (select table_name   from params)             and table_schema = (select table_schema from params)     ),     -- Проверяем существование таблицы и подсчитываем количество столбцов в ней     check_table_exist as (         select             case when count_all_fields < 1 then false else true end table_exists,             case when count_all_fields < 1 then 1 else 0 end table_exists_error,             count_all_fields         from (             select count (*) as count_all_fields             from columns_info         ) sq     ) select * from columns_info

Сравнить датасеты в SQL можно разными способами. Мне известны три — через:

  • оператор except;

  • соединение join;

  • группировку.

В данном случае я воспользуюсь join’ом. Соединение будем делать по столбцам, описанным в CTE columns_info. Начнем с первых 3-х. Таблицам добавим алиасы t и r. Возвращать записи будем только из columns_info. Иначе говоря, только те, которые фактически есть в БД. Код в with идентичен блоку, описанному выше, поэтому его далее буду сокращать для удобства:

with (...) select t.* from columns_info t inner join required_columns r     on  t.column_name               = r.column_name     and t.data_type                 = r.data_type     and t.is_nullable               = r.is_nullable

Остаются еще 3 столбца. С ними сложнее. В CTE required_columns есть только 4 столбца, тогда в columns_info возвращается 6. Это связано с тем, что ожидаемое ограничение по количеству символов и цифр мы задаем в 1-м столбце для всех типов. При этом фактически в метаданных есть разделение такого ограничения на 3 столбца.

Начнем с простого: сравнение для столбцов varchar. null’ы в датасете с ожидаемыми значениями заменяем на 0. На всякий случай повторю, что это нужно для сравнения: null’ы через равенство проверить не получится:

with (...) select t.* from columns_info t inner join required_columns r     on  t.column_name               = r.column_name     and t.data_type                 = r.data_type     and t.is_nullable               = r.is_nullable     and t.character_maximum_length  = coalesce(r.character_maximum_length, 0)

Теперь нужно сравнить целую и дробную части числовых значений ограничений. Но только для типа numeric и decimal, т.к. остальные числовые типы имеют фиксированный диапазон значений. Начнем с целой части чисел. Здесь нужно использовать case для отбора значений с типами numeric и decimal. С учетом того, что сравнивается целая часть, у числа из столбца character_maximum_length (из CTE required_columns) нужно отсекать дробную часть с помощью функции trunc(). Проверяемый столбец может быть единовременно только одного типа, поэтому в ранее написанную обработку для ограничений varchar новый код добавляем через or. Всю обработку при этом оборачиваем в скобки:

with (...) select t.* from columns_info t inner join required_columns r     on  t.column_name               = r.column_name     and t.data_type                 = r.data_type     and t.is_nullable               = r.is_nullable     and t.character_maximum_length  = coalesce(r.character_maximum_length::numeric, 0)     and (     case             when t.data_type in ('numeric', 'decimal')             then t.numeric_precision = trunc(r.character_maximum_length::numeric)         end         or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)     )

И теперь самое сложное: сравнение дробной части. Вычислить её просто — нужно лишь из числа вычесть его целую часть. Пример для наглядности:

18,6 — 18 = 0,6

Но описать кодом это будет сложнее. В таблице price, которая используется в этой статье как объект тестирования, есть столбец price_value. Он имеет тип numeric и ограничение 18.6, то есть 18 цифр в целой части и 6 в дробной. То, что нужно для примера. Попробуем в select добавить разность, чтобы посмотреть, что получится:

img.2024-11-21 22.11.48.png

Это ожидаемое значение. А что в фактическом?

img.2024-11-21 22.10.37.png

То есть чтобы корректно провести сравнение, нужно дробную часть ожидаемого значения перевести в целую. В данном случае достаточно выполнить умножение на 10. Но что если бы ограничение было не 18.6, а, скажем, 18.15? Тогда умножение на 10 нужно проводить во 2-й степени. И значение этой степени — переменное, равное количеству знаков после запятой. Иначе говоря, для столбца price_value, у которого есть ограничение 18.6, степень будет равна 1 (после запятой только 1 цифра — 6). А если ограничение было 18.15, то степень должна была быть 2.

Итак, разность числа и его целой части, которые берутся из character_maximum_length (CTE required_columns), показанная в виде кода на скриншоте выше, выглядит так:

r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)

Теперь её нужно умножить на 10 в степени:

(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, 1)

Но единица (1) в power() покрывает не все случаи, поэтому добавим код для динамического расчета:

(r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric)) * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))

Прокомментирую. r.character_maximum_length::text — выполняем приведение типов, т.к. этот столбец имеет тип numeric, но его значения будут использоваться в функциях, которые работают только с символьными типами. Функция split_part() используется для отбора цифр после запятой (но в качестве разделителя указывается именно точка, т.к. дробное значение в метаданных хранится именно в таком виде). А length подсчитывает количество этих цифр.

Добавляем этот код в выражения для соединения:

with(...) select t.* from columns_info t inner join required_columns r     on  t.column_name   = r.column_name     and t.data_type     = r.data_type     and t.is_nullable   = r.is_nullable     and (         -- Сравниваем целую часть десятичных значений         case             when t.data_type in ('numeric', 'decimal')             then t.numeric_precision = trunc(r.character_maximum_length::numeric)         end         and         -- Сравниваем дробную часть десятичных значений         case             when t.data_type in ('numeric', 'decimal')             then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))             * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))         end         or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)    )

Результат сравнения покажет, какие столбцы тестируемой таблицы имеют ожидаемые метаданные. В дальнейшем отобранные таким образом записи будем сравнивать с датасетом из CTE required_columns. Так мы найдем столбцы, у которых какие-либо метаданные не соответствуют ожидаемым. Добавим этот запрос в CTE с названием fields_comparison:

-- Сравниваем ожидаемый и текущий наборы атрибутов таблицы with  (...),     fields_comparison as (         select t.*         from columns_info t         inner join required_columns r             on  t.column_name   = r.column_name             and t.data_type     = r.data_type             and t.is_nullable   = r.is_nullable             and (                 -- Сравниваем целую часть десятичных значений                 case                     when t.data_type = 'numeric'                     then t.numeric_precision = trunc(r.character_maximum_length::numeric)                 end                 and                 -- Сравниваем дробную часть десятичных значений                 case                     when t.data_type = 'numeric'                     then t.numeric_scale = (r.character_maximum_length::numeric - trunc(r.character_maximum_length::numeric))                     * power(10, length(split_part(r.character_maximum_length::text, '.', 2)))                 end                 or t.character_maximum_length = coalesce(r.character_maximum_length::numeric, 0)            ) ) select * from fields_comparison 

Едем дальше. В следующем CTE, check_unexpected_fields, выполняется поиск столбцов таблицы, которые есть в фактическом датасете, но отсутствуют в ожидаемом (их условно можно назвать «лишними»). Это реализовано через сравнение записей в CTE columns_info и CTE required_columns. Запрос довольно простой:

select column_name from columns_info except select column_name from required_columns

По такому запросу каждое название «лишнего» столбца будет выведено в отдельной строке. Однако с учетом того, что это не единственный CTE, из которого будет формироваться результирующая выборка, вернувшиеся значения нужно собрать в одной ячейке. Я это делаю через перечисление в строке, с запятой в качестве разделителя, с помощью функции string_agg(). И дополнительно подсчитываю количество значений, вернувшихся по запросу:

with (...),     -- Ищем лишние столбцы и считаем их количество     check_unexpected_fields as (         select             count (column_name) as count_unexpected_fields,             string_agg(column_name, ', ') as unexpected_fields         from (             select column_name             from columns_info             except             select column_name             from required_columns         ) sq     ) select * from check_unexpected_fields

Далее в CTE check_missing_fields содержится почти такой же запрос. Разница лишь в том, что запросы в except меняются друг с другом местами. Этот CTE показывает недостающие столбцы тестируемой таблицы. Иначе говоря, те, что есть в ожидаемом датасете, но отсутствуют в фактическом:

with  (...),     -- Ищем недостающие столбцы и считаем их количество     check_missing_fields as (         select             count (column_name) as count_missing_fields,             string_agg(column_name, ', ') as missing_fields         from (             select column_name             from required_columns             except             select column_name             from columns_info         ) sq     ) select * from check_missing_fields

Итак, на текущей стадии нам известны столбцы:

  • Ожидаемые (CTE required_columns);

  • Полностью совпавшие с ожидаемыми (CTE fields_comparison);

  • Лишние (CTE check_unexpected_fields);

  • Недостающие (CTE check_missing_fields).

С помощью этих данных можно найти еще одно множество. В нем будут содержаться столбцы, которые совпадают с ожидаемым датасетом лишь частично. Соответственно, CTE с таким запросом будет возвращать невалидные значения. Следовательно, для него выглядит логичным название check_invalid_fields.

Запрос для поиска невалидных столбцов также содержит в себе оператор except, но в данном случае он используется дважды. Из множества записей ожидаемых столбцов (CTE required_columns) вычитается множество полностью совпавших столбцов (CTE fields_comparison) и недостающих столбцов (CTE check_missing_fields). Для CTE check_missing_fields в рамках такого запроса нужно «развернуть» строку со значениями обратно в набор записей. Это можно сделать с помощью функции string_to_table(). Получаем такой код:

with  (...),     -- Ищем невалидные столбцы и считаем их количество     check_invalid_fields as (         select             count (column_name) as count_invalid_fields,             string_agg(column_name, ', ') as invalid_fields         from (             select column_name             from required_columns             except             select column_name             from fields_comparison             except             select string_to_table(missing_fields, ', ')             from check_missing_fields         ) sq select * from check_invalid_fields

Вся необходимая информация для тестирования столбцов — собрана. Перейдем к ограничениям (constraints). К слову, к ним в контексте метаданных, в частности, относятся зависимые ключи (foreign keys). Для получения необходимых сведений будем запрашивать из информационной схемы столбцы таблиц:

  • constraint_type из table_constraints;

  • column_name из constraint_column_usage;

  • column_name из key_column_usage;

Если какое-либо ограничение не удовлетворяется при изменении данных таблицы, то по запросу возвращается ошибка. В её формулировке будет указано название нарушаемого в таком случае ограничения (violates constraint):

img.2024-11-27 18.34.53.png

Ограничения помогают поддерживать целостность данных. По этой причине важно проверять их наличие и логику. Информацию об ограничениях тестируемой таблицы будем получать по отдельному запросу. Его опишем в CTE constraints_query:

with (...),     -- Ищем все ограничения для таблицы     constraints_query as(         select             t1.constraint_type,             t2.column_name,             t3.column_name as foreign_column         from information_schema.table_constraints t1         left join information_schema.constraint_column_usage t2             on t1.constraint_name = t2.constraint_name         left join information_schema.key_column_usage as t3             on t1.constraint_name = t3.constraint_name             and t1.table_schema = t3.table_schema         where             t1.table_name            = (select table_name   from params)             and t1.constraint_schema = (select table_schema from params)             and t2.column_name is not null     ) select * from constraints_query

По этому запросу будут возвращаться 3 столбца:

img.2024-11-27 19.06.34.png

При этом в ожидаемом датасете, в CTE required_constraints, у нас всего 2 столбца:

with     (...)     -- Описываем ожидаемые ограничения     required_constraints(column_name, constraint_type) as (         values                         ('id',          'PRIMARY KEY'),             ('id',    'FOREIGN KEY'),             ('model_id',    'FOREIGN KEY'),             ('price_value', 'UNIQUE'),             ('model_id',    'UNIQUE')     ) select * from required_constraints

Поэтому в следующем CTE (назовем его union_foreign_ref_columns) объединим значения из столбцов column_name и foreign_column в 1 столбец:

with (...),     -- Включаем значения зависимых ключей (foreign_column) в список ограничений (column_name)     union_foreign_ref_columns as (         select column_name, constraint_type         from constraints_query         union all         select foreign_column as column_name, constraint_type         from constraints_query     ) select * from union_foreign_ref_columns

Фрагмент as column_name необязателен, но я его оставил его для улучшения читаемости кода.

Фактический набор ограничений получили. Теперь можно проводить сравнение. Я решил не делать отдельный CTE под поиск невалидных ограничений, т.к. здесь рассматриваются всего 2 столбца (column_name и constraints_type). В конечном запросе рассматриваются только «лишние» и недостающие ограничения в 2-х соответствующих CTE. И это почти конец запроса:

with (...),     -- Ищем лишние ограничения и считаем их количество     check_unexpected_constraints as (         select             count (column_name) as count_unexpected_constraints,             string_agg(column_name || ' (' || constraint_type || ')', ', ') as unexpected_constraints         from (             select *             from union_foreign_ref_columns             except             select column_name, constraint_type             from required_constraints         ) sq     ) select * from check_unexpected_constraints

и

with (...),     -- Ищем недостающие ограничения и считаем их количество     check_missing_constraints as (         select             count (column_name) as count_missing_constraints,             string_agg(column_name || ' (' || constraint_type || ')', ', ') as missing_constraints         from (             select column_name, constraint_type             from required_constraints             except             select *             from union_foreign_ref_columns         ) sq     ) select * from check_missing_constraints

Логика сравнения здесь такая же, как при сравнении столбцов. Отличие между этими 2-мя CTE — только в порядке расположения относительно except: в check_unexpected_constraints запрос к union_foreign_ref_columns идет до except, а в check_missing_constraints — после. Из интересного здесь разве что данная конструкция:

string_agg(column_name || ' (' || constraint_type || ')', ', ')

string_agg() здесь по-прежнему нужен для вывода нескольких записей в 1-й строке. Но здесь 1-й аргумент функции логически немного отличается от этого же аргумента функции в коде сравнения столбцов. Это нужно исключительно для того, чтобы в 1-й ячейке уместить не только несколько строк, но и информацию сразу по 2-м столбцам. В итоговой выборке значение в этой ячейке будет выглядеть так:

img.2024-11-27 19.27.36.png

По этому скриншоту проще понять, почему конструкция с string_agg() именно такая, какая она есть. В нее включается информацию из 2-х столбцов, при этом значения 2-го столбца берется в скобки с пробелом перед 1-м значением.

Ну и в последнем CTE собираем информацию, полученную в других CTE, и суммируем количество найденных ошибок, которые мы получали с помощью count():

with (...),     -- Собираем полученные данные     checks as (         select             -- Выводим всю информацию об ошибках и суммируем их количество             table_exists_error + count_unexpected_fields + count_missing_fields + count_invalid_fields +             count_unexpected_constraints + count_missing_constraints as errors,             *         from        check_table_exist         cross join  check_unexpected_fields         cross join  check_missing_fields         cross join  check_invalid_fields         cross join  check_unexpected_constraints         cross join  check_missing_constraints ) select * from checks

Значения, выводимые этим запросом в строках, видны в самих строках, в тултипе (если в строке большое значение), и их также можно скопировать и вставить блокнот:

Почему этот запрос полезен?

  • По моему опыту, почему-то новые таблицы БД, которые создают разработчики, часто не соответствуют тому, что описано в требованиях. Исходя из этого, проще один раз описать ожидаемые значения в запросе и запускать его после правок. Это сэкономит время при сравнении и исключит человеческий фактор. Особенно если итераций правок будет несколько;

  • Польза от таких проверочных запросов тем выше, чем больше таблиц добавляется в рамках доработки;

  • Пригодится для быстрого тестирования метаданных таблиц после миграции баз данных на другой стенд.


ссылка на оригинал статьи https://habr.com/ru/articles/862562/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *