Параллельная обработка большого селекта в нескольких сессиях

Представьте: есть селект, который возвращает записи, каждую из которых нужно обработать, и то ли много записей, то ли обработка каждой записи занимает много времени, а процесс обработки одной записи не зависит от процессов других записей.
Классический пример для того, чтобы задействовать многопоточность или в случае баз данных выполнять обработку в нескольких сессиях. В Оракле для этого используется hint /*+ parallel() */ и pipelined functions. Это здорово, но если у вас Oracle standard edition(где parallel не работает) или вы хотите обработать не каждую запись по отдельности(из соображений, что лучше накопить работу, а потом в bulk, одним ударом, выполнить), а поделить весь вывод селекта на куски и каждый обработать отдельно?
Задача ставится так:
Написать Java stored procedure, которая получает следующие параметры:

  • Текст селекта
  • Имя процедуры, которая будет работать с порцией данных
  • Колличество потоков(Thread)
  • Данные, необходимые для подключения к базе

Сначала посмотрим, что можно сделать с pipelined функцией.

Java откроет по тексту селекта result set в default connection.
Первым делом надо выполнить
select count(*) from («Текст селекта»);
Создадим connection pool с размерностью, заданной в 3-м параметре.
Создадим отдельные сессии, присоединившись через jdbc connection.
Данные для этого возьмем из 4-го параметра, нам, по большому счету нужен только пароль, все остальное получим сами(может еще порт, если он отличен от 1521).
Будем получать данные из селекта в default connection и переписывать их в сессию из пула. Как только решим, что накопили достаточно, создадим thread, передадим ему эту connection как параметр и пусть работает, а мы продолжим со следующей сессией или, если все уже прочитано, подождем окончания всех потоков.
Напишем функцию обработки. Она получает все поля селекта как параметры.
Будет удобно, чтобы, например, первые два параметра были бы номер в порции и ее размерность. Это даст возможность в dbms info выводить процент выполнения в потоке.
По метадате селекта будем конструировать ее вызов в виде примерно так:
begin proc1(23,14000,’a1′,3,’tratata’,35,48); end;
Хранить будем только такую строку.
Вначале это был 2-х мерный массив (i,j), где i — это номер потока(в дальнейшем…). Потом я увидел, что при большом числе записей, затраты Oracle на поддержку большого массива становятся чрезмерными и решил пользоваться также временной таблицей(temporary table).
Я положил границу в 200,000 записей. Если селект count(*) возвращает меньше 200,000 Java в-runtime использует 2-х мерный String массив, если больше — пишет во временную таблицу с одним полем varchar2(4000).

Итак, в PL/SQL пакете создаем функцию
FUNCTION run_pipe_parallel(pi_Select_Txt VARCHAR2, pi_Proc_Name VARCHAR2, pi_Parallel_Count VARCHAR2, Pi_Password VARCHAR2) RETURN VARCHAR2 AS LANGUAGE JAVA NAME 'com.samtrest.ParallelRunner.run_parallel(java.lang.String, java.lang.String,java.lang.String, java.lang.String) return java.lang.String';
На стороне Java есть функция

  public static String run_parallel(String selectTxt,        String procedureName,        String threadCount,       String password) throws NumberFormatException, SQLException, ClassNotFoundException {     String rc = "OK";     ParallelRunner parallelRunner = new  ParallelRunner(selectTxt,procedureName,Integer.parseInt(threadCount),password);      try {       parallelRunner.runProc();     } catch (SQLException e) {       e.printStackTrace();       rc = e.getMessage();     } catch (ClassNotFoundException e) {       e.printStackTrace();       rc = e.getMessage();     }        return rc;   } 

Получение массива типов данных полей селекта

    res = stm.executeQuery();     ResultSetMetaData meta = res.getMetaData();     columnCount = meta.getColumnCount();     int [] types = new int[columnCount];     for (int k = 0; k < columnCount; k++) {       types[k] = meta.getColumnType(k+1);          }  

Так строим строку вызова:

    while (res.next()){       callStr =  "begin "+procedureName+"("+processSeq+","+(j+1)+","+chunkCount;       for (int k = 0; k < columnCount; k++) {         callStr = callStr+",";         String value = "";         if ( types[k] == java.sql.Types.VARCHAR || types[k] == 1){           value = res.getString(k+1);           if (value == null){             value = "null";           }else{             value = "'"+value+"'";           }         }else if (types[k] == java.sql.Types.NUMERIC){           BigDecimal number  = res.getBigDecimal(k+1);           if (number == null){             value = "null";           }else{             value = number.toString();           }         }else if (types[k] == java.sql.Types.DATE || types[k] == java.sql.Types.TIMESTAMP){           Timestamp date  = res.getTimestamp(k+1);           if (date == null){             value = "null";           }else{             value = "to_date('"+date.toString().substring(0,date.toString().indexOf('.'))+ "','yyyy-mm-dd hh24:mi:ss')";           }         }else{           System.out.println(""+types[k]);         }         callStr = callStr + value;       }        callStr = callStr + "); end;";  

Накапливаем в массиве или таблице

      if (rowCount > CHUNK_LIMIT){         insert.setString(1, callStr);         insert.executeUpdate();       }else{         chunks[i][j] = callStr;       } 

ссылка на оригинал статьи http://habrahabr.ru/post/269693/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *