Применение JDBC в процессе ETL

от автора

В предыдущей статье мы упоминали о Java + JDBC в качестве альтернативы Python + ODBC. В этой статье приведём условную ситуацию, приближенную к той, которая побудила нас сменить язык программирования.

Была поставлена задача по обработке более 25 млн. строк посредством процесса ETL: на основе имеющихся данных расcчитать два дополнительных столбца и вернуть их в базу. Python прекрасно справлялся с подобными задачами, но в данной ситуации скорость получения результата была не самой оптимальной: обработка длилась более суток.

Так как процедуру предполагалось выполнять многократно, то было решено найти более быстрый инструмент. Поиск альтернативных библиотек для Python ни к чему не привёл. Поэтому было решено применить тяжёлую артиллерию — Java, так как Java работает с БД быстрее, чем Python чисто технически. И чем больше объём данных, тем заметнее реализуется это преимущество.

На первом этапе разовьём класс DBC из предыдущей статьи, добавив:

  1. метод execQuerySelect для выполнения select-запросов в разных вариантах (с TOP, where и т.д.)

  2. метод execQueryInsert для вставки результата в БД

  3. вспомогательные метода buildInsertColumns и buildInsertValues для формирования колонок и строк соответственно.

В итоге класс DBC пополнится следующим кодом:

/**      * Выполнение select запроса к БД      *      * @param select Названия необходимых колонок (* - получить все)      * @param from   Таблица из которой небходимо получить данные      * @return ResultSet с результатами работы запроса      */     public ResultSet execQuerySelect(String select, String from) {         if (!readyToWork) {             System.err.println("DBC not ready to work! Abort:execQuerySelected");             return null;         }          String query = "SELECT " + select + " FROM " + from;          return execQuery(query);     }      /**      * Выполнение select запроса к БД      *      * @param select Названия необходимых колонок (* - получить все)      * @param from   Таблица из которой небходимо получить данные      * @param top    Количество "верхних" строк получаемых из базы      * @return ResultSet с результатами работы запроса      */     public ResultSet execQuerySelect(String select, String from, Integer top) {         if (!readyToWork) {             System.err.println("DBC not ready to work! Abort:execQuerySelected");             return null;         }          String query = "SELECT TOP(" + top + ") " + select + " FROM " + from;          return execQuery(query);     }      /**      * Выполнение select запроса к БД      *      * @param select Названия необходимых колонок (* - получить все)      * @param from   Таблица из которой небходимо получить данные      * @param where  Условие выборки значений из таблицы      * @return ResultSet с результатами работы запроса      */     public ResultSet execQuerySelect(String select, String from, String where) {         if (!readyToWork) {             System.err.println("DBC not ready to work! Abort:execQuerySelected");             return null;         }          String query = "SELECT " + select + " FROM " + from + " WHERE " + where;          return execQuery(query);     }      /**      * Выполнение select запроса к БД      *      * @param select Названия необходимых колонок (* - получить все)      * @param from   Таблица из которой небходимо получить данные      * @param where  Условие выборки значений из таблицы      * @param top    Количество "верхних" строк получаемых из базы      * @return ResultSet с результатами работы запроса      */     public ResultSet execQuerySelect(String select, String from, String where, Integer top) {         if (!readyToWork) {             System.err.println("DBC not ready to work! Abort:execQuerySelected");             return null;         }          String query = "SELECT TOP(" + top + ") " + select + " FROM " + from + " WHERE " + where;          return execQuery(query);     }      /**      * Выполнение insert запроса к БД      *      * @param table Имя таблицы для вставки      * @param columns Массив колонок для вставки      * @param data Вектор словарей со значениями для вставки      * @return Флаг успешности выполнения запроса      */     public boolean execQueryInsert(String table, String[] columns, Vector<HashMap<String, String>> data) {         if (!readyToWork) {             System.err.println("DBC not ready to work! Abort:execQuerySelected");             return false;         }          String query = "INSERT INTO " + table + " " + buildInsertColumns(columns) + " VALUES ";          for (int i = 0; i < data.size() - 1; i++) {             query = query.concat(buildInsertValues(columns, data.get(i), false));         }          query = query.concat(buildInsertValues(columns, data.get(data.size() - 1), true));          execQuery(query);         return true;     } 	 	    /**      * Формирует строку с перечнем колонок для запроса insert      *      * @param columns Массив колонок      * @return Строку с перечнем колонок для запроса insert      */     private String buildInsertColumns (String[] columns) {          String r = "(";          for (int i = 0; i < columns.length - 1; i++) {             r = r.concat("[" + columns[i] + "], ");         }          r = r.concat("[" + columns[columns.length-1] + "])");          return r;     }      /**      * Формирует строку с перечнем значений для запроса insert      *      * @param columns Массив колонок      * @param data Словарь значений для вставки      * @param isLast Флаг нахождения значений на последней позиции среди остальных      * @return Строку с перечнем значений для запроса insert      */     private String buildInsertValues (String[] columns, HashMap<String, String> data, boolean isLast) {         String vals = "(";          for (int i = 0; i < columns.length - 1; i++) {             vals = vals.concat(data.get(columns[i]).concat(", "));         }          vals = vals.concat(data.get(columns[columns.length-1]).concat(")"));          if (isLast) {             vals = vals.concat(";");         }         else         {             vals = vals.concat(", ");         }          return vals;     }

Теперь, имея все необходимые инструменты, перейдём к решению нашей задачи. В рамках абстракции заменим сложные вычисления на вычисления суммы и разности двух чисел. Итоговый алгоритм заключается в получении двух чисел из одной таблицы, проведении расчетов, вставке результата в новую таблицу. С учётом алгоритма решения нашей задачи MainClass из предыдущей статьи будет выглядеть так:

package DataBaseTools;  import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Vector;  public class MainClass {      public static void main(String[] args) throws SQLException {         // create instance         DBC dbc = new DBC("localhost", "testDB");           // get query response         ResultSet response = dbc.execQuerySelect("x, y", "digits",);           Vector<HashMap<String, String>> insertBuffer = new Vector<HashMap<String, String>>();          // handle received data         if (response == null) {             System.out.println("NULL");         } else {             while (response.next()) {                 HashMap<String, String> rowData = new HashMap<String, String>();  				int x = response.getInt("x"); 				int y = response.getInt("y"); 				 				int a = x + y; 				int s = x - y; 				 				rowData.put("a", a.toString()); 				rowData.put("s", s.toString());                  insertBuffer.add(rowData);                  if (insertBuffer.size() == 1000) {                     String[] cols = {"a", "s"};                      System.out.println(dbc.execQueryInsert("[results]", cols, insertBuffer));                      insertBuffer.clear();                 }             }              if (insertBuffer.size() > 0) {                 String[] cols = {"a", "s"};                  System.out.println(dbc.execQueryInsert("[results]", cols, insertBuffer));                  insertBuffer.clear();             }         }      } }

Этот код создаёт подключение к базе, получает данные из исходной таблицы, производит расчет и запись результата в буфер, а при наполнении буфера происходит запись в базу. После выполнения цикла оставшиеся в буфере данные также записываются в базу. Обработка такого количества данных при изначальной сложности вычислений заняла чуть более 7 часов, что минимум в 4 раза быстрее аналогичного кода на Python. При увеличении скорости работы с БД такое решение позволит увеличить сложность вычислений, не увеличивая время выполнения.

ссылка на оригинал статьи https://habr.com/ru/post/564654/