Уверен многие тру-программисты и без меня это знают, но я решился опубликовать пару реализаций циклов через while, которыми я активно пользуюсь, как автоматизатор, тестировщик и разработчик ETL.
while not all(check_list)
При переносе данных из одной базы в другую, часто требуется перенести не одну таблицу, а две и более. Перенести, значит удалить данные в таблице на приемнике и затем вставить данные, взятые с источника:
-- SQL delete from my_shema.my_table where my_column = 'my_variable'; commit; insert into my_shema.my_table select * from my_shema.my_table@src_dblink where my_column = 'my_variable'; commit;
Дополнительно, нужно полазить по схеме и выяснить зависимости между таблицами, которые могут вызывать исключения типа ORA-02291 и ORA-02292 (это когда мы не можем или удалить данные в таблице, не удалив предварительно данные из другой таблицы, или вставить, предварительно не вставив данные в другую таблицу).
Грубо говоря нам нужно обеспечить каскадное удаление данных, а затем каскадную вставку.
Чтобы этого не делать, можно при срабатывании исключения БД отправлять DML-операцию в конец очереди и для этого можно использовать цикл while not all(check_list):
# python3.8 import cx_Oracle # составляем чек-лист переноса данных, # если значение в списке 0, значит данные не перенесены, обратно, если 1 etl_objects = [('MY_SCHEMA', 'MY_TABLE1'), ('MY_SCHEMA', 'MY_TABLE2'), ...] check_list = [0 for _ in range(len(etl_objects))] i = 0 while not all(check_list): i = i % len(check_list) # нужно для второго и последующих циклов try: if not check_list[i]: with cx_Oracle.connect(user=user, password=password, tns=tns, encoding='utf-8') as db_conn: with db_conn.cursor() as cursor: sql_text = (f"delete from {etl_objects[i][0]}.{etl_objects[i][1]} " "where my_column = 'my_variable'") cursor.execute(sql_text) cursor.execute('commit') sql_text = (f"insert into {etl_objects[i][0]}.{etl_objects[i][1]} " "select * " f"from {etl_objects[i][0]}.{etl_objects[i][1]}@src_dblink " "where my_column = 'my_variable';") cursor.execute(sql_text) cursor.execute('commit') check_list[i] = 1 except Exception as e: print(f"Сработало исключение {str(e)}: объект загрузки отправлен в конец очереди") i += 1
Понятное дело, что формально объект загрузки не отправляется в конец очереди, но цикл while будет повторяться до тех пор, пока не останется ни одного незагруженного объекта. Также понятное дело, что если сработали другие исключения, отличные от ORA-02291 и ORA-02292, то цикл рискует стать бесконечным, но этот пример подразумевает, что в остальном ограничения источника и приемника не будут мешать загрузке.
while i < len(table_trg) and j < len(table_src)
При тестировании DDL таблиц в разных базах данных может понадобиться сравнивать их наборы полей и типов данных, которые могут отличаться. То есть семантика таблиц одинаковая, а реализация отличается. Например, обе таблицы хранят один и тот же набор основных данных, но названия полей погут отличаться, также для полей с одинаковыми названием и назначением могут отличаться типы данных или ограничения типа данных.
В oracle вывести набор полей и типов данных из двух источников можно с помощью запросов:
-- SQL select column_name, data_type, data_length ,..., from all_tab_columns where owner = 'MY_SHEMA' and table_name = 'MY_TABLE'; select column_name, data_type, data_length ,..., from all_tab_columns@src_dblink where owner = 'MY_SHEMA' and table_name = 'MY_TABLE';
В идеале наборы и значения полей должны быть идентичны, но не всегда бывает так, особенно, когда базы данных живут своими жизнями и одна жизнь отстает в развитии от другой. Так вот, чтобы узнать в чем она отстает или в чем они расходятся между собой можно с помощью кода:
# python3.8 import cx_Oracle with cx_Oracle.connect(user=user, password=password, tns=tns, encoding='utf-8') as db_conn: with db_conn.cursor() as cursor: sql_text = ("select column_name, data_type, data_length " "from all_tab_columns " "where owner = 'MY_SHEMA' and table_name = 'MY_TABLE' " "order by column_name") cursor.execute(sql_text) table_trg = cursor.fetchall() sql_text = ("select column_name, data_type, data_length " "from all_tab_columns@src_dblink " "where owner = 'MY_SHEMA' and table_name = 'MY_TABLE' " "order by column_name") cursor.execute(sql_text) table_src = cursor.fetchall() i, j = 0, 0 tmp_table_trg, tmp_table_src = [], [] while i < len(table_trg) and j < len(table_src): if table_trg[i][0] == table_src[j][0]: # сравниваем названия полей tmp_table_trg.append(table_trg[i]) tmp_table_src.append(table_src[j]) i += 1 j += 1 elif table_trg[i][0] < table_src[j][0]: # если название поля меньше, то в правой таблице поля с таким же названием нет tmp_table_trg.append(table_trg[i]) tmp_table_src.append((None, None, None)) # добавляем в правую таблицу "пустую" строку i += 1 else: tmp_table_trg.append((None, None, None)) # добавляем в левую таблицу "пустую" строку tmp_table_src.append(table_src[j]) j += 1 diffs = list(filter(lambda x: x[0] != x[1], zip(tmp_table_trg, tmp_table_src)))
Вот такие две реализации циклов через while я использую в своей работе.
Первый пример ( while not all() ) помогает гонять циклы по чек-листам, а второй ( while i < len(table_trg) and j < len(table_src) ), помогает выравнивать списки кортежей.
Чтобы мой код выше можно было проверить, если у вас нет возможности делать запросы к БД, можно переписать мой код на работу с таблицами csv.
ссылка на оригинал статьи https://habr.com/ru/articles/853682/
Добавить комментарий