Пишем ETL-процесс на Python, часть 2

от автора

Сегодня мы с вами сделаем web-интерфейс для управления запуском ETL-процесса. В прошлой статье мы написали консольный скрипт, который разово запускает выгрузку. Но как это передать заказчику ?!

Примечание: технические подробности для краткости содержания буду упускать. Если что-то захотите узнать подробнее — спрашивайте в комментариях.

Что нам понадобится

  • виртуальное окружение с установленными пакетами django, redis, django_celery_beat, django-celery-results. Подробнее о требуемых зависимостях тут.

  • запущенный redis-server

Примечание: вместо redis-server можно использовать другой брокер сообщений — rabbitmq. В этом случае вам будет нужно указать другой URL брокера в настройках, указанных ниже.

Старый-добрый джанго

Итак, поскольку речь идёт о python и нам нужен web-интерфейс, мы поступим просто и инициализируем джанго проект с приложением в нём:

django-admin startproject config .  # проект создается в текущей папке, имя конфигурационной папки config django-admin startapp etl_app  # приложение создаем для размещения в нем модуля с бизнес-логикой

Джанго мы выбрали из-за того, что в нём есть готовый административный интерфейс.

Согласно мануалам django_celery_beat, django-celery-results добавляем в settings.py нашего проекта новые приложения и некоторые настройки.

config/settings.py:

INSTALLED_APPS = [     ...     'etl_app',     'django_celery_beat',     'django_celery_results', ]"  ...  # CELERY CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_TASK_TRACK_STARTED = True  # Планировщик задач CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'  CELERY_BROKER_TRANSPORT_OPTION = {'visibility_timeout': 3600} CELERY_RESULT_BACKEND = 'django-db' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'  CELERY_TASK_DEFAULT_QUEUE = 'default' 

В данном проекте «из песочницы» у нас одна очередь, однако можно настроить несколько очередей, например для распределения задач по приоритетам.

Настраиваем celery

Добавляем в папку настроек джанго проекта модуль celery.py и делаем доступным экземпляр приложения celery_app.

config/celery.py:

import os  from celery import Celery  os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") app = Celery("etl_project") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() 

config/__init__.py:

from .celery import app as celery_app  __all__ = ('celery_app',) 

Регистрируем нашу etl-функцию

В папке приложения etl_app создаем модуль tasks.py и импортируем в него код etl-процесса.

etl_app/tasks.py:

from celery import shared_task  from etl_app import etl   @shared_task(name="Задача ETL")  # регистрируем функцию в воркере def etl_task(*args, **kwargs):     unloads = etl.load()     multiplication = etl.transform(unloads)     etl.extract(multiplication)      return "my result data"  # здесь может быть более полезная информация 

Единственная задача декоратора @shared_task — зарегистрировать нашу функцию в воркере и сделать её доступной для запуска из очереди.

Запускаем проект

Открываем несколько терминалов:

  1. (опционально) redis-server, если он у вас не запущен ранее

  2. celery -A config worker -l info

  3. celery -A config beat -l info

  4. python manage.py runserver

В терминале воркера вы должны увидеть нашу задачу в перечне доступных:

- ** ---------- [config] - ** ---------- .> app:         etl_project:0x7ff955e38490 - ** ---------- .> transport:   redis://127.0.0.1:6379/0 - ** ---------- .> results:      - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> default          exchange=default(direct) key=default   [tasks]   . Задача ETL

В терминале beat’а должно быть сообщение о том, что управление расписанием задач доступно:

LocalTime -> 2023-01-22 05:02:23 Configuration ->     . broker -> redis://127.0.0.1:6379/0     . loader -> celery.loaders.app.AppLoader     . scheduler -> django_celery_beat.schedulers.DatabaseScheduler      . logfile -> [stderr]@%INFO     . maxinterval -> 5.00 seconds (5s) [2023-01-22 05:02:23,184: INFO/MainProcess] beat: Starting...

Создаем периодическую задачу

Для начала создаем экземпляр модели Intervals. На скрине ниже мы создаем период «раз в 10 минут»:

Приложение django_celery_beat предлагает большое множество настройки расписания, включая классический cron. Оставляю вам это на самостоятельное изучение по документации.

Затем создаем расписание для нашей задачи в модели Periodic tasks. Если до этого вы всё сделали правильно, то в выпадающем списке задач вы увидите нашу «Задача ETL».

А вот обещанная возможность запустить задачу вне плана (см. Action):

Смотрим результат выполнения

В модели Task results есть возможность посмотреть результат выполнения. Обратите внимание на поля Task State и Result Data. Значение последнего поля берется из return нашего таска (функция, обернутая в @shared_task):

Заключение

Сегодня мы научились на примере etl-процесса создавать, запускать и контролировать выполнение регулярных задач. Использовали джанго по назначению — создали web-интерфейс в «обозначенные» сроки. Результат можно передавать пользователю. Помните о том, что вашему пользователю нужно назначить права в админке на работу с моделями из приложений в django_celery_beat и django_celery_results.

Благодарю за внимание. С удовольствием отвечу на ваши вопросы. Репозиторий кода доступен по ссылке.


ссылка на оригинал статьи https://habr.com/ru/post/711590/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *