Миграция данных с MySQL на PostgreSQL

от автора

По мере работы с базами данных, ознакомления с их плюсами и минусами, возникает момент, когда принимается решение миграции с одной СУБД в другую. В данном случае возникла задача переноса сервисов с MySQL на PostgreSQL. Вот небольшой перечень вкусностей, которые ждут от перехода на PostgreSQL, версии 9.2 (с более подробным списком возможностей можно ознакомится тут):

  • наследование таблиц (есть ограничения, которые обещают в будущем исправить)
  • диапазоны: int4range, numrange, daterange
  • поддержка из коробки несколько языков для хранимых функций (PL/pgSQL, PL/Tcl, PL/Perl, PL/Python и голый C)
  • оператор WITH, позволяющий делать рекурсивные запросы
  • (планируется) материализованные представления (частично они доступны и сейчас — как IUD правила к представлению)
  • (планируется) триггера на DDL операции

Как правило, существующие решения опираются на работу с уже готовым SQL дампом, который конвертируется в соответствии с синтаксисом целевой БД. Но в некоторых случаях (активно использующееся веб-приложение с большим объемом информации) такой вариант несет определенные временные затраты на создание SQL дампа из СУБД, его конвертации и загрузку получившегося дампа снова в СУБД. Поэтому оптимальней будет online-вариант (прямиком из СУБД в СУБД) конвертера, что может существенно уменьшить простой сервисов.

Языком для реализации выбран C++ (с некоторыми возможностями из C++11x), библиотеки для соединения с MySQL и PostgreSQL использовались нативные, в качестве IDE был задействован Qt Creator.

Алгоритм миграции состоит в следующем. Подразумевается что в БД-получателе уже создана структура таблиц, соответствующая структуре в БД-источнике. Формируется список таблиц для переноса данных, который затем распределяется в пуле потоков. Каждый поток имеет подключение к БД-источнику и к БД-получателю. Т.е. параллельно переносится несколько таблиц. Profit!

Традиционно любое приложение имеет некоторый каркас — набор системных компонент, на которые опираются другие компоненты — работа с конфигурационным файлом, логом, обработчик ошибок, менеджер памяти и прочее. В нашем случае, используется только самое необходимое для решения задачи. Во-первых, были переопределены (исключительно для удобства) некоторые фундаментальные и составные типы (да, знаю, можно было аlias templates использовать, но получилось так):

простые типы

typedef bool t_bool;  typedef char t_char; typedef unsigned char t_uchar; typedef signed char t_schar;  typedef int t_int; typedef unsigned int t_uint;  typedef float t_float; typedef double t_double; 

map

template<typename T, typename U> class CMap     : public std::map<T, U> { public:	     CMap();     virtual ~CMap();  };  template<typename T, typename U> CMap<T, U>::CMap() { }  template<typename T, typename U> CMap<T, U>::~CMap() { } 

vector

template<typename T> class CVector     : public std::vector<T> { public:     CVector();     virtual ~CVector();  };  template<typename T> CVector<T>::CVector() { }  template<typename T> CVector<T>::~CVector() { } 

fstream

class CFileStream     : public std::fstream { public:     CFileStream();     virtual ~CFileStream();  }; 

Из явных паттернов использован только синглтон:

классический синглтон Мэйерса

template<typename T> class CSingleton { public:     static T* instance();     void free();  protected:     CSingleton();     virtual ~CSingleton();  };  template<typename T> T* CSingleton<T>::instance() {     static T *instance = new T();      return instance; }  template<typename T> void CSingleton<T>::free() {     delete this; }  template<typename T> CSingleton<T>::CSingleton() { }  template<typename T> CSingleton<T>::~CSingleton() { } 

Базовые классы для задачи (выполняется в отдельном потоке) и системы (запускает на выполнение задачи):

task.h

class CTask { public:	     CTask();     virtual ~CTask();      void execute();      t_uint taskID();     t_bool isExecuted();  protected:     virtual void executeEvent() = 0;  private:     t_uint m_task_id;     t_bool m_executed;  }; 

task.cpp

CTask::CTask()     : m_executed(false) {     static t_uint task_id = 0;      m_task_id = task_id++; }  CTask::~CTask() { }  void CTask::execute() {     executeEvent();      m_executed = true; }  t_uint CTask::taskID() {     return m_task_id; }  t_bool CTask::isExecuted() {     return m_executed; } 

system.h

class CSystem { public:     CSystem();     virtual ~CSystem() = 0;  protected:     void executeTask(CTask *task);  }; 

system.cpp

CSystem::CSystem() { }  CSystem::~CSystem() { }  void CSystem::executeTask(CTask *task) {     CTask& task_ref = *task;      std::thread thread([&]() { task_ref.execute(); });      thread.detach(); } 

В завершении рассмотрения базовых типов нужно упомянуть класс строки, который пришлось написать с нуля, чтобы для некоторых операций (замена подстроки и конкатенация) была возможность работы с передаваемым буфером (о нем чуть ниже) без дополнительных выделений памяти и некоторые вещи (преобразование строки в число и числа в строки) сделать членами класса (приводится только объявление класса):

string.h

class CString { public:     CString(const t_char *data = nullptr);     CString(const CString& s);     ~CString();      const t_char* ptr() const;     void setPtr(t_char *p);      CString& operator= (const CString& s);     CString operator+ (const t_char *p) const;     CString operator+ (t_char c) const;     CString operator+ (const CString& s) const;     friend CString operator+ (const t_char *p, const CString& s);     CString& operator+= (const t_char *p);     CString& operator+= (t_char c);     CString& operator+= (const CString& s);      t_bool operator== (const CString& s) const;     t_bool operator!= (const CString& s) const;      t_bool operator< (const CString& s) const;     t_bool operator> (const CString& s) const;     t_bool operator<= (const CString& s) const;     t_bool operator>= (const CString& s) const;      t_char& at(t_uint index);     t_char at(t_uint index) const;      t_uint length() const;     t_bool isEmpty() const;      void clear();      t_int search(const CString& s, t_uint from = 0) const;     CString substr(t_uint from, t_int count = -1) const;     CString replace(const CString& before, const CString& after) const;      static CString fromNumber(t_uint value);     static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr);      CVector<CString> split(const CString& splitter) const;     t_bool match(const CString& pattern) const;      static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer);     static t_uint lengthPtr(const t_char *src);     static t_uint concatenatePtr(const t_char *src, char *buffer);  private:     t_char *m_data;      t_uint length(const t_char *src) const;     t_char* copy(const t_char *src) const;     t_char* concatenate(const t_char *src0, t_char c) const;     t_char* concatenate(const t_char *src0, const t_char *src1) const;     t_int compare(const t_char *src0, const t_char *src1) const; };  CString operator+ (const t_char *p, const CString& s); 

Как неизбежность, для приложения, чуть больше чем «Hello,world», это лог и конфигурационный файл. В методе записи сообщения в лог был задействован мьютекс, так как каждая задача по мере обработки таблицы пишет об этом в лог. Мелкогранулярные блокировки и lockfree-алгоритмы не рассматривались по причине того, что запись в лог — это далеко не узкое место в работе приложения:

log.h

class CLog     : public CSingleton<CLog> { public:     enum MessageType     {         Information,         Warning,         Error     };      CLog();     virtual ~CLog();      void information(const CString& message);     void warning(const CString& message);     void error(const CString& message);  private:     std::mutex m_mutex;      CFileStream m_stream;      void writeTimestamp();     void writeHeader();     void writeFooter();     void writeMessage(MessageType type, const CString& message);  }; 

log.cpp

CLog::CLog() {     m_stream.open("log.txt", std::ios_base::out);      writeHeader(); }  CLog::~CLog() {     writeFooter();      m_stream.flush();     m_stream.close(); }  void CLog::information(const CString& message) {     writeMessage(Information, message); }  void CLog::warning(const CString& message) {     writeMessage(Warning, message); }  void CLog::error(const CString& message) {     writeMessage(Error, message); }  void CLog::writeTimestamp() {     time_t rawtime;     tm *timeinfo;     t_char buffer[32];      time(&rawtime);     timeinfo = localtime(&rawtime);      strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo);      m_stream << buffer << " "; }  void CLog::writeHeader() {     writeMessage(Information, "Log started"); }  void CLog::writeFooter() {     writeMessage(Information, "Log ended"); }  void CLog::writeMessage(MessageType type, const CString& message) {     std::lock_guard<std::mutex> guard(m_mutex);      writeTimestamp();      switch (type)     {     case Information:         {             m_stream << "Information " << message.ptr();              break;         }      case Warning:         {             m_stream << "Warning " << message.ptr();              break;         }      case Error:         {             m_stream << "Error " << message.ptr();              break;         }      default:         {             break;         }     }      m_stream << "\n";      m_stream.flush(); } 

config.h

class CConfig     : public CSingleton<CConfig> { public:     CConfig();     virtual ~CConfig();      CString value(const CString& name, const CString& defvalue = "") const;  private:     CFileStream m_stream;     CMap<CString, CString> m_values;  }; 

config.cpp

CConfig::CConfig() {     m_stream.open("mysql2psql.conf", std::ios_base::in);      if (m_stream.is_open())     {         CString line;          const t_uint buffer_size = 256;         t_char buffer[buffer_size];          while (m_stream.getline(buffer, buffer_size))         {             line = buffer;              if (!line.isEmpty() && line.at(0) != '#')             {                 t_int pos = line.search("=");                  CString name = line.substr(0, pos);                 CString value = line.substr(pos + 1);                  m_values.insert(std::pair<CString, CString>(name, value));             }         }          m_stream.close();          CLog::instance()->information("Config loaded");     }     else     {         CLog::instance()->warning("Can't load config");     } }  CConfig::~CConfig() { }  CString CConfig::value(const CString& name, const CString& defvalue) const {     CMap<CString, CString>::const_iterator iter = m_values.find(name);      if (iter != m_values.end())     {         return iter->second;     }      return defvalue; } 

mysql2psql.conf

# MySQL connection mysql_host=localhost mysql_port=3306 mysql_database=mysqldb mysql_username=root mysql_password=rootpwd mysql_encoding=UTF8  # PostgreSQL connection psql_host=localhost psql_port=5432 psql_database=psqldb psql_username=postgres psql_password=postgrespwd psql_encoding=UTF8  # Migration # (!) Note: source_schema == mysql_database source_schema=mysqldb destination_schema=public tables=* use_insert=0  # Other settings threads=16 

Теперь, что касательно добавления данных в PostgreSQL. Есть два варианта — использовать запросы INSERT, которые на большом массиве данных не очень себя показали в плане производительности (особенности транзакционного механизма), или через команду COPY, которая позволяет непрерывно пересылать порции данных, отправляя в конце передачи специальный маркер (символ-терминатор). Еще один нюанс связан с определением типа (поля в таблице) в PostgreSQL. В документации не указано (возможно не было чтения между строк документации), как можно вернуть человекопонятный идентификатор типа, поэтому было составлено соответствие oid (почти уникальный идентификатор каждого объекта в БД) и типа:

case 20: // int8 case 21: // int2 case 23: // int4 case 1005: // int2 case 1007: // int4 case 1016: // int8 case 700: // float4 case 701: // float8 case 1021: // float4 case 1022: // float8 case 1700: // numeric case 18: // char case 25: // text case 1002: // char case 1009: // text case 1015: // varchar case 1082: // date case 1182: // date case 1083: // time case 1114: // timestamp case 1115: // timestamp case 1183: // time case 1185: // timestamptz case 16: // bool case 1000: // bool 

Подготовка и выполнение задач состоит в следующем:

  • создается список таблиц
  • создаются подключения (по количеству задач) к БД-источнику и БД-приемнику
  • распределяются диапазоны из списка таблиц задачам
  • задачи запускаются на выполнение (с переданным диапазоном таблиц и подключениями к БД)
  • ожидается выполнение задач (главный поток + созданные потоки)

В каждой задаче выделяется три статических буфера по 50 МБ, в которых происходит подготовка данных для команды COPY (экранирование специальных символов и конкатенация значений полей):

фрагмент кода c подготовкой задач

// create connection pool  t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1")); CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads));  if (!createConnectionPool(threads - 1)) {     return false; }  // create tasks  CString destination_schema = CConfig::instance()->value("destination_schema");  t_uint range_begin = 0; t_uint range_end = 0;  t_uint range = m_tables.size() / threads;  for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j) {     range_begin = i;     range_end = i + range;      std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end));      m_migration_tasks.push_back(std::move(task)); }  range_begin = range_end + 1; range_end = m_tables.size() - 1;  std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end));  // executing tasks  for (t_uint i = 0; i < m_migration_tasks.size(); ++i) {     executeTask(m_migration_tasks.at(i).get()); }  task->execute();  // wait for completion  for (t_uint i = 0; i < m_migration_tasks.size(); ++i) {     while (!m_migration_tasks.at(i)->isExecuted())     {     } } 

фрагмент кода с подготовкой в задаче данных для COPY

t_uint count = 0; t_char *value;  CString copy_query = "COPY " + m_destination_schema + "." + table + " ( ";  m_buffer[0] = '\0'; m_buffer_temp0[0] = '\0'; m_buffer_temp1[0] = '\0';  if (result->nextRecord()) {     for (t_uint i = 0; i < result->columnCount(); ++i)     {         if (i != 0)         {             copy_query += ", ";             CString::concatenatePtr("\t", m_buffer);         }          copy_query += result->columnName(i);          if (!result->isColumnNull(i))         {             value = result->columnValuePtr(i);                  CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);             CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);             CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);             CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);             CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);             CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);             CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0);              CString::concatenatePtr(m_buffer_temp0, m_buffer);         }         else         {             CString::concatenatePtr("\\N", m_buffer);         }     }      copy_query += " ) FROM STDIN";      if (!m_destination_connection->copyOpen(copy_query))     {         CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError());          return false;     }      CString::concatenatePtr("\n", m_buffer);      if (!m_destination_connection->copyDataPtr(m_buffer))     {         CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());          return false;     }      ++count;      while (result->nextRecord())     {         m_buffer[0] = '\0';          for (t_uint i = 0; i < result->columnCount(); ++i)         {             if (i != 0)             {                 CString::concatenatePtr("\t", m_buffer);             }              if (!result->isColumnNull(i))             {	                 value = result->columnValuePtr(i);                  CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0);                 CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1);                 CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0);                 CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1);                 CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0);                 CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1);                 CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0);                      CString::concatenatePtr(m_buffer_temp0, m_buffer);             }             else             {                  CString::concatenatePtr("\\N", m_buffer);             }         }          CString::concatenatePtr("\n", m_buffer);          if (!m_destination_connection->copyDataPtr(m_buffer))         {             CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());              return false;         }          ++count;          if (count % 250000 == 0)         {             CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable " + table + " processing, record count: " + CString::fromNumber(count));         }     } }  

Результаты

Для переноса 2 Гб данных в PostgreSQL, c включенным WAL-архивированием, потребовалось порядка 10 минут (создано 16 потоков).

Над чем стоит подумать

  • Определение на этапе выполнения количества задача/потоков — на основании количества данных и доступных аппаратных возможностей
  • Определение количества необходимой памяти под буфер, в котором готовятся данные для COPY
  • Распределение таблиц между задачами не по диапазону, а по необходимости — задачи берут таблицу из threadsafe-стека

Исходный код

Исходный код доступен на github.

ссылка на оригинал статьи http://habrahabr.ru/post/175459/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *