Рецепты PostgreSQL: загрузка Государственного Адресного Реестра

от автора

Для приготовления загрузки Государственного Адресного Реестра в PostgreSQL нам понадобится сам PostgreSQL, bash, sh, curl, wget, xml2csv, jq, или можно воспользоваться готовым образом.

ОСТОРОЖНО! Может потребоваться много дискового пространства! Терабайта должно хватить, может, даже пол-терабайта хватит.

Первым делом создадим файлы

# показываем, что выполняем # выходим при ошибках # отсутствие переменной окружения - это ошибка set -eux  SELF="$$" # запоминаем свой pid touch deltaVersionId.txt # дата последней загруженной дельты touch fullVersionId.txt # дата загрузки полного ГАРа touch state.txt # текущее состояние

Далее, в зависимости от состояния будем делать различную работу

state="$(cat state.txt)" # читаем текущее состояние while # пока     case "$state" in # при состоянии         ... # таком-то делаем то-то     esac     state="$(cat state.txt)" # читаем текущее состояние     test "$state" != "done" # выполняем пока состояние не готово do true; done 2>&1 | tee cron.log # пишем в лог и консоль

Итак, изначально не было никакого состояния (а также в случае состояния готово)

* ) # в остальных случаях echo "done" >state.txt # запишем состояние готово deltaVersionId="$(cat deltaVersionId.txt)" # прочитаем дату дельты fullVersionId="$(cat fullVersionId.txt)" # прочитаем дату полного if [ -z "$fullVersionId" ]; then # если ещё ни разу не загружали полный     echo sql2pg >state.txt # то переходми к состоянию инициализации базы elif [ -z "$deltaVersionId" ]; then # иначе если ни разу не загружали дельту, то # скачиваем информацию о последних доступных выгрузках     wget --output-document=GetLastDownloadFileInfo.json https://fias.nalog.ru/WebServices/Public/GetLastDownloadFileInfo  # извлекаем из неё дату последней версии        lastVersionId="$(jq --raw-output .VersionId <GetLastDownloadFileInfo.json)" # и адрес последней выгрузки         URL="$(jq --raw-output .GarXMLFullURL <GetLastDownloadFileInfo.json)"     # задаём имя скачиваемого файла     ZIP="$lastVersionId.zip"     # скачиваем выгрузку (полная выгрузка скачивается ОЧЕНЬ долго, часов 5)     wget --continue --output-document="$ZIP" "$URL"     echo "$lastVersionId" >deltaVersionId.txt # сохраняем дату последней выгрузки как дельты     echo "$lastVersionId" >fullVersionId.txt # так и полной     echo unzip >state.txt # переходим к состоянию распаковки else # иначе (база проинициализирована и уже скачивали полную выгрузку или дельту) # скачиваем информацию обо всех доступных выгрузках     wget --output-document=GetAllDownloadFileInfo.json https://fias.nalog.ru/WebServices/Public/GetAllDownloadFileInfo # скачиваем все выгрузки после даты последней дельты         jq --raw-output "sort_by(.VersionId) | .[] | select(.VersionId > $deltaVersionId) \     # параллельно в несколько потоков (по количеству доступных процессоров)     | .GarXMLDeltaURL" <GetAllDownloadFileInfo.json | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=URL bash /usr/local/bin/wget.sh "URL"     test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке     # обновляем соответственно дату последней дельты     jq --raw-output "sort_by(.VersionId) | .[] | select(.VersionId > $deltaVersionId) \     # последовательно (и при успехе переходим к состоянию распаковки)     | .GarXMLDeltaURL" <GetAllDownloadFileInfo.json | xargs --verbose --no-run-if-empty --replace=URL bash /usr/local/bin/version.sh "URL"     test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке fi ;;

файлы скачивания дельты и обновления версии ничего интересного из себя не представляют, кроме разве что в файле скачивания

# вычисляем размер скачиваемого файла SIZE="$(curl -Is "$URL" | grep 'Content-Length' | grep -oP '\d+')" test "$SIZE" -lt 1073741824 # если размер больше 1ГБ, то выходим

далее

"sql2pg" ) # в случае инициализации базы # исполняем все sql-скрипты find /usr/local/sql2pg -type f -name "*.sql" | sort -u \ # последовательно | xargs --verbose --no-run-if-empty --replace=SQL cat "SQL" | psql --variable=ON_ERROR_STOP=1 test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке echo region2pg >state.txt # переходим к состоянию инициализации для регионов ;;

sql-скрипты представляют из себя создание таблиц с комментариями и необходимых в дальнейшем индексов

далее

"region2pg" ) # в случае инициализации базы для регионов # для каждого региона от 01 до 99 (для простоты взят весь диапазон чисел) seq --format "%02.0f" 1 99 \ # последовательно добавляем соответствующую схему в базу | xargs --verbose --no-run-if-empty --replace=REGION echo "CREATE SCHEMA IF NOT EXISTS \"REGION\";" | psql --variable=ON_ERROR_STOP=1 test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке # для всех sh-скриптов find /usr/local/region2pg -type f -name "*.sh" | sort -u | while read -r SH; do     seq --format "%02.0f" 1 99 \ # для каждого региона     # последовательно выполняем скрипт, который генерирует sql-код         # для создания таблиц и индексов     | xargs --verbose --no-run-if-empty --replace=REGION sh "$SH" "REGION" | psql --variable=ON_ERROR_STOP=1     test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке done echo wget >fullVersionId.txt # зануляем полную версию echo wget >state.txt # переходим к состоянию скачивания # но т.к. отдельной обработки такого состояния нет, то оно # обрабатывается дефолтным, описанным выше самым первым ;;

sh-скрипты представляют из себя генерацию sql-кода создания таблиц и необходимых в дальнейшем индексов для каждого заданного региона

далее

"unzip" ) # в случае распаковки find . -type f -name "*.zip" | sort -u \ # все архивы # распаковываем параллельно      # (делая названия файлов маленькими буквами)     # и при успехе удаляя архив | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=ZIP bash /usr/local/bin/unzip.sh "ZIP" test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке echo xml2csv >state.txt # переходим к состоянию конвертации ;;

далее

"xml2csv" ) # в случае конвертации # для всех sh-скриптов find /usr/local/xml2csv -type f -name "*.sh" | sort -u | while read -r SH; do     TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы     # для всех соответствующих xml-файлов     find . -type f -name "as_${TABLE}_2*.xml" | sort -u \     # параллельно конвертируем их в csv-файлы         # и при успехе удаляем xml-файл     | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=XML bash "$SH" "XML"     test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке done deltaVersionId="$(cat deltaVersionId.txt)" # читаем сохранённую дату дельты fullVersionId="$(cat fullVersionId.txt)" # и полной выгрузки # если они разные if [ "$deltaVersionId" != "$fullVersionId" ]; then     echo delta2pg >state.txt # то переходим к состоянию загрузки дельты else # иначе (если они одинаковые)     echo full2pg >state.txt # то переходим к состоянию загрузки полной fi ;;

sh-скрипты представляют из себя команды для конвертации

сначала происходит

"full2pg" ) # в случае загрузки полной выгрузки # для всех sh-скриптов find /usr/local/full2pg -type f -name "*.sh" | sort -u | while read -r SH; do     TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы     # для всех csv-файлов     find . -type f -name "as_${TABLE}_2*.csv" | sort -u \     # параллельно загружаем их в базу         # и при успехе удаляем csv-файл     | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=CSV bash "$SH" "CSV"     test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке done echo update >state.txt # переходим к состоянию обновления ;;

sh-скрипты представляют из себя генерацию sql-кода для загрузки соответствующей таблицы в базу и выполнение загрузки с помощью оператора COPY только актуальных данных

или же (во все следующие разы) происходит

"delta2pg" ) # в случае загрузки дельты # для всех sh-скриптов find /usr/local/delta2pg -type f -name "*.sh" | sort -u | while read -r SH; do     TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы     # для всех скачанных дельт (если их было несколько с предыдущего раза)     find -mindepth 1 -maxdepth 1 -type d | sort -u | while read -r DIR; do     # для всех csv-файлов         find "$DIR" -type f -name "as_${TABLE}_2*.csv" | sort -u \         # параллельно загружаем их в базу             # и успехе удаляем csv-файл         | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=CSV bash "$SH" "CSV"         test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке         rmdir "$DIR"/* "$DIR" # удаляем директорию дельты     done done echo update >state.txt # переходим к состоянию обновления ;;

sh-скрипты представляют из себя генерацию sql-кода для загрузки соответствующей таблицы в базу и выполнение загрузки с помощью оператора COPY во временную таблицу всех данных, а затем вставку из временной таблицы в основную с обновлением при кофликте, а в конце неактуальные данные удаляются из основной таблицы

таким образом, запуская главный скрипт в докере по крону каждый день получаем актуальное состояние Государственного Адресного Реестра в PostgreSQL

Про состояние обновления и дальнейшее использование Государственного Адресного Реестра будет в следующей статье.


ссылка на оригинал статьи https://habr.com/ru/articles/584724/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *