- наследование таблиц (есть ограничения, которые обещают в будущем исправить)
- диапазоны: int4range, numrange, daterange
- поддержка из коробки несколько языков для хранимых функций (PL/pgSQL, PL/Tcl, PL/Perl, PL/Python и голый C)
- оператор WITH, позволяющий делать рекурсивные запросы
- (планируется) материализованные представления (частично они доступны и сейчас — как IUD правила к представлению)
- (планируется) триггера на DDL операции
Как правило, существующие решения опираются на работу с уже готовым SQL дампом, который конвертируется в соответствии с синтаксисом целевой БД. Но в некоторых случаях (активно использующееся веб-приложение с большим объемом информации) такой вариант несет определенные временные затраты на создание SQL дампа из СУБД, его конвертации и загрузку получившегося дампа снова в СУБД. Поэтому оптимальней будет online-вариант (прямиком из СУБД в СУБД) конвертера, что может существенно уменьшить простой сервисов.
Языком для реализации выбран C++ (с некоторыми возможностями из C++11x), библиотеки для соединения с MySQL и PostgreSQL использовались нативные, в качестве IDE был задействован Qt Creator.
Алгоритм миграции состоит в следующем. Подразумевается что в БД-получателе уже создана структура таблиц, соответствующая структуре в БД-источнике. Формируется список таблиц для переноса данных, который затем распределяется в пуле потоков. Каждый поток имеет подключение к БД-источнику и к БД-получателю. Т.е. параллельно переносится несколько таблиц. Profit!
Традиционно любое приложение имеет некоторый каркас — набор системных компонент, на которые опираются другие компоненты — работа с конфигурационным файлом, логом, обработчик ошибок, менеджер памяти и прочее. В нашем случае, используется только самое необходимое для решения задачи. Во-первых, были переопределены (исключительно для удобства) некоторые фундаментальные и составные типы (да, знаю, можно было аlias templates использовать, но получилось так):
typedef bool t_bool; typedef char t_char; typedef unsigned char t_uchar; typedef signed char t_schar; typedef int t_int; typedef unsigned int t_uint; typedef float t_float; typedef double t_double;
template<typename T, typename U> class CMap : public std::map<T, U> { public: CMap(); virtual ~CMap(); }; template<typename T, typename U> CMap<T, U>::CMap() { } template<typename T, typename U> CMap<T, U>::~CMap() { }
template<typename T> class CVector : public std::vector<T> { public: CVector(); virtual ~CVector(); }; template<typename T> CVector<T>::CVector() { } template<typename T> CVector<T>::~CVector() { }
class CFileStream : public std::fstream { public: CFileStream(); virtual ~CFileStream(); };
Из явных паттернов использован только синглтон:
template<typename T> class CSingleton { public: static T* instance(); void free(); protected: CSingleton(); virtual ~CSingleton(); }; template<typename T> T* CSingleton<T>::instance() { static T *instance = new T(); return instance; } template<typename T> void CSingleton<T>::free() { delete this; } template<typename T> CSingleton<T>::CSingleton() { } template<typename T> CSingleton<T>::~CSingleton() { }
Базовые классы для задачи (выполняется в отдельном потоке) и системы (запускает на выполнение задачи):
class CTask { public: CTask(); virtual ~CTask(); void execute(); t_uint taskID(); t_bool isExecuted(); protected: virtual void executeEvent() = 0; private: t_uint m_task_id; t_bool m_executed; };
CTask::CTask() : m_executed(false) { static t_uint task_id = 0; m_task_id = task_id++; } CTask::~CTask() { } void CTask::execute() { executeEvent(); m_executed = true; } t_uint CTask::taskID() { return m_task_id; } t_bool CTask::isExecuted() { return m_executed; }
class CSystem { public: CSystem(); virtual ~CSystem() = 0; protected: void executeTask(CTask *task); };
CSystem::CSystem() { } CSystem::~CSystem() { } void CSystem::executeTask(CTask *task) { CTask& task_ref = *task; std::thread thread([&]() { task_ref.execute(); }); thread.detach(); }
В завершении рассмотрения базовых типов нужно упомянуть класс строки, который пришлось написать с нуля, чтобы для некоторых операций (замена подстроки и конкатенация) была возможность работы с передаваемым буфером (о нем чуть ниже) без дополнительных выделений памяти и некоторые вещи (преобразование строки в число и числа в строки) сделать членами класса (приводится только объявление класса):
class CString { public: CString(const t_char *data = nullptr); CString(const CString& s); ~CString(); const t_char* ptr() const; void setPtr(t_char *p); CString& operator= (const CString& s); CString operator+ (const t_char *p) const; CString operator+ (t_char c) const; CString operator+ (const CString& s) const; friend CString operator+ (const t_char *p, const CString& s); CString& operator+= (const t_char *p); CString& operator+= (t_char c); CString& operator+= (const CString& s); t_bool operator== (const CString& s) const; t_bool operator!= (const CString& s) const; t_bool operator< (const CString& s) const; t_bool operator> (const CString& s) const; t_bool operator<= (const CString& s) const; t_bool operator>= (const CString& s) const; t_char& at(t_uint index); t_char at(t_uint index) const; t_uint length() const; t_bool isEmpty() const; void clear(); t_int search(const CString& s, t_uint from = 0) const; CString substr(t_uint from, t_int count = -1) const; CString replace(const CString& before, const CString& after) const; static CString fromNumber(t_uint value); static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr); CVector<CString> split(const CString& splitter) const; t_bool match(const CString& pattern) const; static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer); static t_uint lengthPtr(const t_char *src); static t_uint concatenatePtr(const t_char *src, char *buffer); private: t_char *m_data; t_uint length(const t_char *src) const; t_char* copy(const t_char *src) const; t_char* concatenate(const t_char *src0, t_char c) const; t_char* concatenate(const t_char *src0, const t_char *src1) const; t_int compare(const t_char *src0, const t_char *src1) const; }; CString operator+ (const t_char *p, const CString& s);
Как неизбежность, для приложения, чуть больше чем «Hello,world», это лог и конфигурационный файл. В методе записи сообщения в лог был задействован мьютекс, так как каждая задача по мере обработки таблицы пишет об этом в лог. Мелкогранулярные блокировки и lockfree-алгоритмы не рассматривались по причине того, что запись в лог — это далеко не узкое место в работе приложения:
class CLog : public CSingleton<CLog> { public: enum MessageType { Information, Warning, Error }; CLog(); virtual ~CLog(); void information(const CString& message); void warning(const CString& message); void error(const CString& message); private: std::mutex m_mutex; CFileStream m_stream; void writeTimestamp(); void writeHeader(); void writeFooter(); void writeMessage(MessageType type, const CString& message); };
CLog::CLog() { m_stream.open("log.txt", std::ios_base::out); writeHeader(); } CLog::~CLog() { writeFooter(); m_stream.flush(); m_stream.close(); } void CLog::information(const CString& message) { writeMessage(Information, message); } void CLog::warning(const CString& message) { writeMessage(Warning, message); } void CLog::error(const CString& message) { writeMessage(Error, message); } void CLog::writeTimestamp() { time_t rawtime; tm *timeinfo; t_char buffer[32]; time(&rawtime); timeinfo = localtime(&rawtime); strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo); m_stream << buffer << " "; } void CLog::writeHeader() { writeMessage(Information, "Log started"); } void CLog::writeFooter() { writeMessage(Information, "Log ended"); } void CLog::writeMessage(MessageType type, const CString& message) { std::lock_guard<std::mutex> guard(m_mutex); writeTimestamp(); switch (type) { case Information: { m_stream << "Information " << message.ptr(); break; } case Warning: { m_stream << "Warning " << message.ptr(); break; } case Error: { m_stream << "Error " << message.ptr(); break; } default: { break; } } m_stream << "\n"; m_stream.flush(); }
class CConfig : public CSingleton<CConfig> { public: CConfig(); virtual ~CConfig(); CString value(const CString& name, const CString& defvalue = "") const; private: CFileStream m_stream; CMap<CString, CString> m_values; };
CConfig::CConfig() { m_stream.open("mysql2psql.conf", std::ios_base::in); if (m_stream.is_open()) { CString line; const t_uint buffer_size = 256; t_char buffer[buffer_size]; while (m_stream.getline(buffer, buffer_size)) { line = buffer; if (!line.isEmpty() && line.at(0) != '#') { t_int pos = line.search("="); CString name = line.substr(0, pos); CString value = line.substr(pos + 1); m_values.insert(std::pair<CString, CString>(name, value)); } } m_stream.close(); CLog::instance()->information("Config loaded"); } else { CLog::instance()->warning("Can't load config"); } } CConfig::~CConfig() { } CString CConfig::value(const CString& name, const CString& defvalue) const { CMap<CString, CString>::const_iterator iter = m_values.find(name); if (iter != m_values.end()) { return iter->second; } return defvalue; }
# MySQL connection mysql_host=localhost mysql_port=3306 mysql_database=mysqldb mysql_username=root mysql_password=rootpwd mysql_encoding=UTF8 # PostgreSQL connection psql_host=localhost psql_port=5432 psql_database=psqldb psql_username=postgres psql_password=postgrespwd psql_encoding=UTF8 # Migration # (!) Note: source_schema == mysql_database source_schema=mysqldb destination_schema=public tables=* use_insert=0 # Other settings threads=16
Теперь, что касательно добавления данных в PostgreSQL. Есть два варианта — использовать запросы INSERT, которые на большом массиве данных не очень себя показали в плане производительности (особенности транзакционного механизма), или через команду COPY, которая позволяет непрерывно пересылать порции данных, отправляя в конце передачи специальный маркер (символ-терминатор). Еще один нюанс связан с определением типа (поля в таблице) в PostgreSQL. В документации не указано (возможно не было чтения между строк документации), как можно вернуть человекопонятный идентификатор типа, поэтому было составлено соответствие oid (почти уникальный идентификатор каждого объекта в БД) и типа:
case 20: // int8 case 21: // int2 case 23: // int4 case 1005: // int2 case 1007: // int4 case 1016: // int8 case 700: // float4 case 701: // float8 case 1021: // float4 case 1022: // float8 case 1700: // numeric case 18: // char case 25: // text case 1002: // char case 1009: // text case 1015: // varchar case 1082: // date case 1182: // date case 1083: // time case 1114: // timestamp case 1115: // timestamp case 1183: // time case 1185: // timestamptz case 16: // bool case 1000: // bool
Подготовка и выполнение задач состоит в следующем:
- создается список таблиц
- создаются подключения (по количеству задач) к БД-источнику и БД-приемнику
- распределяются диапазоны из списка таблиц задачам
- задачи запускаются на выполнение (с переданным диапазоном таблиц и подключениями к БД)
- ожидается выполнение задач (главный поток + созданные потоки)
В каждой задаче выделяется три статических буфера по 50 МБ, в которых происходит подготовка данных для команды COPY (экранирование специальных символов и конкатенация значений полей):
// create connection pool t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1")); CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads)); if (!createConnectionPool(threads - 1)) { return false; } // create tasks CString destination_schema = CConfig::instance()->value("destination_schema"); t_uint range_begin = 0; t_uint range_end = 0; t_uint range = m_tables.size() / threads; for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j) { range_begin = i; range_end = i + range; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end)); m_migration_tasks.push_back(std::move(task)); } range_begin = range_end + 1; range_end = m_tables.size() - 1; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end)); // executing tasks for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { executeTask(m_migration_tasks.at(i).get()); } task->execute(); // wait for completion for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { while (!m_migration_tasks.at(i)->isExecuted()) { } }
t_uint count = 0; t_char *value; CString copy_query = "COPY " + m_destination_schema + "." + table + " ( "; m_buffer[0] = '\0'; m_buffer_temp0[0] = '\0'; m_buffer_temp1[0] = '\0'; if (result->nextRecord()) { for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { copy_query += ", "; CString::concatenatePtr("\t", m_buffer); } copy_query += result->columnName(i); if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } copy_query += " ) FROM STDIN"; if (!m_destination_connection->copyOpen(copy_query)) { CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError()); return false; } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; while (result->nextRecord()) { m_buffer[0] = '\0'; for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { CString::concatenatePtr("\t", m_buffer); } if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; if (count % 250000 == 0) { CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable " + table + " processing, record count: " + CString::fromNumber(count)); } } }
Результаты
Для переноса 2 Гб данных в PostgreSQL, c включенным WAL-архивированием, потребовалось порядка 10 минут (создано 16 потоков).
Над чем стоит подумать
- Определение на этапе выполнения количества задача/потоков — на основании количества данных и доступных аппаратных возможностей
- Определение количества необходимой памяти под буфер, в котором готовятся данные для COPY
- Распределение таблиц между задачами не по диапазону, а по необходимости — задачи берут таблицу из threadsafe-стека
Исходный код
Исходный код доступен на github.
ссылка на оригинал статьи http://habrahabr.ru/post/175459/
Добавить комментарий