Асинхронные задания в Django с Celery

Перевод статьи подготовлен в преддверии старта курса «Web-разработчик на Python».


Если в вашем приложении есть какой-то длительный процесс, вы можете обрабатывать его не в стандартном потоке запросов/ответов, а в фоновом режиме.

К примеру, в вашем приложении пользователь должен отправить картинку-миниатюру (которую, скорее всего, нужно будет отредактировать) и подтвердить адрес электронной почты. Если ваше приложение обрабатывает изображение, а потом отправляет письмо для подтверждения в обработчике запросов, то конечному пользователю придется зачем-то ждать завершения выполнения обеих задач перед тем, как перезагрузить или закрыть страницу. Вместо этого, вы можете передать эти операции в очередь задач и оставить на обработку отдельному процессу, чтобы немедленно отправить пользователю ответ. В таком случае, конечный пользователь сможет заниматься другими делами на стороне клиента во время выполнения обработки в фоновом режиме. Ваше приложение в таком случае также сможет свободно отвечать на запросы других пользователей и клиентов.

Сегодня мы поговорим о процессе настройки и конфигурирования Celery и Redis для обработки длительных процессов в приложении на Django, чтобы решать такие задачи. Также мы воспользуемся Docker и Docker Compose, чтобы связать все части вместе, и рассмотрим, как тестировать задания Celery с помощью модульных и интеграционных тестов.

К концу этого руководства мы научимся:

  • Интегрировать Celery в Django, чтобы создавать фоновые задания.
  • Упаковывать Django, Celery и Redis с помощью Docker.
  • Запускать процессы в фоновом режиме с помощью отдельного рабочего процесса.
  • Сохранять логи Celery в файл.
  • Настраивать Flower для мониторинга и администрирования заданий и воркеров (worker) Celery.
  • Тестировать задания Celery с помощью модульных и интеграционных тестов.

Фоновые задачи

Для улучшения пользовательского опыта, продолжительные процессы должны выполняться в фоновом режиме вне обычного потока HTTP-запросов/ответов.

Например:

  • Отправка писем для подтверждения;
  • Веб-скейпинг и краулинг;
  • Анализ данных;
  • Обработка изображений;
  • Генерация отчетов.

При создании приложения, старайтесь отделять задачи, которые должны выполняться в течение жизненного цикла запроса/ответа, например CRUD-операции, от задач, которые должны выполняться в фоновом режиме.

Рабочий процесс

Наша цель – разработать приложение на Django, которое для обработки продолжительных процессов вне цикла запрос/ответ использует Celery.

  1. Конечный пользователь генерирует новое задание, отправляя POST-запрос на сервер.
  2. В этом представлении задание добавляется в очередь, а id задания отправляется обратно на клиент.
  3. С помощью AJAX клиент продолжает опрашивать сервер, чтобы проверить состояние задания, в том время как само задание выполняется в фоновом режиме.

Создание проекта

Клонируйте проект из репозитория django-celery и выполните checkout по тегу v1 в ветке master:

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch $ cd django-celery $ git checkout v1 -b master

Поскольку в общей сложности нам нужно работать с тремя процессами (Django, Redis, воркер), мы используем Docker для упрощения работы, соединив их так, чтобы мы могли все запустить одной командой в одном окне терминала.

Из корня проекта создайте образы и запустите Docker-контейнеры:

$ docker-compose up -d --build

Когда сборка завершится, перейдите на localhost:1337:

Убедитесь в том, что тесты проходят успешно:

$ docker-compose exec web python -m pytest  ======================================== test session starts ======================================== platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, inifile: pytest.ini plugins: django-3.8.0 collected 1 item  tests/test_tasks.py .                                                                         [100%]  ========================================= 1 passed in 0.47s =========================================

Давайте взглянем на структуру проекта перед тем, как двигаться дальше:

├── .gitignore ├── LICENSE ├── README.md ├── docker-compose.yml └── project     ├── Dockerfile     ├── core     │   ├── __init__.py     │   ├── asgi.py     │   ├── settings.py     │   ├── urls.py     │   └── wsgi.py     ├── entrypoint.sh     ├── manage.py     ├── pytest.ini     ├── requirements.txt     ├── static     │   ├── bulma.min.css     │   ├── jquery-3.4.1.min.js     │   ├── main.css     │   └── main.js     ├── tasks     │   ├── __init__.py     │   ├── apps.py     │   ├── migrations     │   │   └── __init__.py     │   ├── templates     │   │   └── home.html     │   └── views.py     └── tests         ├── __init__.py         └── test_tasks.py 

Запуск задания

Обработчик событий в project/static/main.js подписан на нажатие на кнопку. По клику на сервер отправляет AJAX POST-запрос с соответствующим типом задания: 1, 2 или 3.

$('.button').on('click', function() {   $.ajax({     url: '/tasks/',     data: { type: $(this).data('type') },     method: 'POST',   })   .done((res) => {     getStatus(res.task_id);   })   .fail((err) => {     console.log(err);   }); });

На стороне сервера уже настроено представление для обработки запроса в project/tasks/views.py:

def run_task(request):     if request.POST:         task_type = request.POST.get("type")         return JsonResponse({"task_type": task_type}, status=202)

А теперь начинается самое интересное: привязываем Celery!

Настройка Celery

Начнем с того, что добавим Celery и Redis в файл project/requirements.txt:

celery==4.4.1 Django==3.0.4 redis==3.4.1  pytest==5.4.1 pytest-django==3.8.0

Celery использует брокер сообщенийRabbitMQ, Redis или AWS Simple Queue Service (SQS) – чтобы упростить коммуникацию между воркером Celery и веб-приложением. Сообщения направляются к брокеру, а после обрабатываются воркером. После этого результаты отправляются на бэкенд.

Redis будет одновременно и брокером и бэкендом. Добавьте Redis и воркера Celery в файл docker-compose.yml следующим образом:

version: '3.7'  services:   web:     build: ./project     command: python manage.py runserver 0.0.0.0:8000     volumes:       - ./project:/usr/src/app/     ports:       - 1337:8000     environment:       - DEBUG=1       - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m       - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]       - CELERY_BROKER=redis://redis:6379/0       - CELERY_BACKEND=redis://redis:6379/0     depends_on:       - redis    celery:     build: ./project     command: celery worker --app=core --loglevel=info     volumes:       - ./project:/usr/src/app     environment:       - DEBUG=1       - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m       - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]       - CELERY_BROKER=redis://redis:6379/0       - CELERY_BACKEND=redis://redis:6379/0     depends_on:       - web       - redis    redis:     image: redis:5-alpine

Обратите внимание на celery worker --app=core --loglevel=info:

  1. celery worker используется для запуска воркера Celery;
  2. --app=core используется для запуска core приложения Celery (которое мы коротко определим);
  3. --loglevel=info определяет уровень логирования информации.

В модуль настроек проекта добавьте следующее, чтобы Celery использовала Redis в качестве брокера и бэкенда:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0") CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Затем создайте файл sample_tasks.py в project/tasks:

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True

Здесь, с помощью декоратора shared_task мы определили новую функцию-задание Celery, которая называется create_task.

Помните о том, что само задание не будет выполняться из процесса Django, оно будет выполнено воркером Celery.

А теперь добавьте файл celery.py в "project/core":

import os  from celery import Celery   os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") app = Celery("core") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()

Что тут происходит?

  1. Для начала нужно установить значение по умолчанию для среды DJANGO_SETTINGS_MODULE, чтобы Celery знала, как найти проект Django.
  2. Затем мы создали экземпляр Celery с именем core и поместили в переменную app.
  3. Затем мы загрузили значения конфигурации Celery из объекта настроек из django.conf. Мы использовали namespace=«CELERY» для предотвращения коллизий с другими настройками Django. Таким образом, все настройки конфигурации для Celery должны начинаться с префикса CELERY_.
  4. Наконец, app.autodiscover_tasks() говорит Celery искать задания из приложений, определенных в settings.INSTALLED_APPS.

Измените project/core/__init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:

from .celery import app as celery_app   __all__ = ("celery_app",)

Запуск задания

Обновите представление, чтобы начать выполнение задания и отправить id:

@csrf_exempt def run_task(request):     if request.POST:         task_type = request.POST.get("type")         task = create_task.delay(int(task_type))         return JsonResponse({"task_id": task.id}, status=202)

Не забудьте импортировать задание:

from tasks.sample_tasks import create_task

Соберите образы и разверните новые контейнеры:

$ docker-compose up -d --build

Для запуска нового задания, выполните:

$ curl -F type=0 http://localhost:1337/tasks/

Вы увидите что-то вроде этого:

{   "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de" }

Статус задания

Вернитесь к обработчику событий на стороне клиента:

$('.button').on('click', function() {   $.ajax({     url: '/tasks/',     data: { type: $(this).data('type') },     method: 'POST',   })   .done((res) => {     getStatus(res.task_id);   })   .fail((err) => {     console.log(err);   }); });

Когда от AJAX-запроса вернется ответ, мы будем слать getStatus() с id задания каждую секунду:

function getStatus(taskID) {   $.ajax({     url: `/tasks/${taskID}/`,     method: 'GET'   })   .done((res) => {     const html = `       <tr>         <td>${res.task_id}</td>         <td>${res.task_status}</td>         <td>${res.task_result}</td>       </tr>`     $('#tasks').prepend(html);      const taskStatus = res.task_status;      if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;     setTimeout(function() {       getStatus(res.task_id);     }, 1000);   })   .fail((err) => {     console.log(err)   }); }

Если ответ положительный, то новая строка будет добавлена к таблице DOM. Обновите представление get_status, чтобы вернуть статус:

@csrf_exempt def get_status(request, task_id):     task_result = AsyncResult(task_id)     result = {         "task_id": task_id,         "task_status": task_result.status,         "task_result": task_result.result     }     return JsonResponse(result, status=200)

Импортируйте AsyncResult:

from celery.result import AsyncResult

Обновите контейнеры:

$ docker-compose up -d --build

Запустите новое задание:

$ curl -F type=1 http://localhost:1337/tasks/

Затем извлеките task_id из ответа и вызовите обновленный get_status, чтобы увидеть статус:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/  {     "task_id": "25278457-0957-4b0b-b1da-2600525f812f",     "task_status": "SUCCESS",     "task_result": true }

Ту же информацию вы можете посмотреть в браузере:

Логи Celery

Обновите сервис celery в docker-compose.yml так, чтобы логи Celery отправились в отдельный файл:

celery:   build: ./project   command: celery worker --app=core --loglevel=info --logfile=logs/celery.log   volumes:     - ./project:/usr/src/app   environment:     - DEBUG=1     - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m     - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]     - CELERY_BROKER=redis://redis:6379/0     - CELERY_BACKEND=redis://redis:6379/0   depends_on:     - web     - redis 

Добавьте новую директорию в “project” и назовите ее “logs”. Затем добавьте в этот новый каталог положите файл celery.log.

Обновите:

$ docker-compose up -d --build

Вы должны видеть, как файл с логами локально заполняется после настройки volume:

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0 [2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors [2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone [2020-03-25 19:42:30,664: WARNING/MainProcess]     /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:     UserWarning: Using settings.DEBUG leads to a memory     leak, never use this setting in production environments!     warnings.warn('''Using settings.DEBUG leads to a memory [2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready. [2020-03-25 19:43:07,103: INFO/MainProcess]     Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729] [2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]     Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]     succeeded in 10.027462100006233s: True

Панель мониторинга Flower

Flower – это легкий веб-инструмент для мониторинга Celery в режиме реального времени. Вы можете отслеживать запущенные задания, увеличивать или уменьшать пул воркеров, отображать графики и статистику, например.

Добавьте его в requirements.txt:

celery==4.4.1 Django==3.0.4 flower==0.9.3 redis==3.4.1  pytest==5.4.1 pytest-django==3.8.0

Затем добавьте новый сервис в docker-compose.yml:

dashboard:   build: ./project   command:  flower -A core --port=5555 --broker=redis://redis:6379/0   ports:     - 5555:5555   environment:     - DEBUG=1     - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m     - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]     - CELERY_BROKER=redis://redis:6379/0     - CELERY_BACKEND=redis://redis:6379/0   depends_on:     - web     - redis     - celery

И протестируйте:

$ docker-compose up -d --build

Перейдите на localhost:5555 для просмотра панели мониторинга. Вы должны увидеть одного воркера:

Запустите еще несколько заданий, чтобы протестировать панель мониторинга:

Попробуйте добавить больше воркеров и посмотреть, как это скажется на производительности:

$ docker-compose up -d --build --scale celery=3

Тесты

Давайте начнем с самого простого теста:

def test_task():     assert sample_tasks.create_task.run(1)     assert sample_tasks.create_task.run(2)     assert sample_tasks.create_task.run(3)

Добавьте тест-кейс выше в project/tests/test_tasks.py и допишите следующий импорт:

from tasks import sample_tasks

Запустите этот тест:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Выполнение данного теста займет около минуты:

======================================== test session starts ======================================== platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, inifile: pytest.ini plugins: django-3.8.0, celery-4.4.1 collected 2 items / 1 deselected / 1 selected  tests/test_tasks.py .                                                                         [100%]  ============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Стоит отметить, что в assert’ах выше мы использовали метод .run вместо .delay для непосредственного запуска задач, без использования воркера Celery.
Хотите использовать заглушки(mock), чтобы ускорить процесс?

@patch('tasks.sample_tasks.create_task.run') def test_mock_task(mock_run):     assert sample_tasks.create_task.run(1)     sample_tasks.create_task.run.assert_called_once_with(1)      assert sample_tasks.create_task.run(2)     assert sample_tasks.create_task.run.call_count == 2      assert sample_tasks.create_task.run(3)     assert sample_tasks.create_task.run.call_count == 3

Импортируйте:

from unittest.mock import patch, call

Протестируйте:

$ docker-compose exec web python -m pytest -k "test_mock_task"  ======================================== test session starts ======================================== platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1 django: settings: core.settings (from ini) rootdir: /usr/src/app, inifile: pytest.ini plugins: django-3.8.0, celery-4.4.1 collected 3 items / 2 deselected / 1 selected  tests/test_tasks.py .                                                                         [100%]  ================================== 1 passed, 2 deselected in 1.13s ==================================

Видите? Теперь гораздо быстрее!

Как насчет полноценного интеграционного тестирования?

def test_task_status(client):     response = client.post(reverse("run_task"), {"type": 0})     content = json.loads(response.content)     task_id = content["task_id"]     assert response.status_code == 202     assert task_id      response = client.get(reverse("get_status", args=[task_id]))     content = json.loads(response.content)     assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}     assert response.status_code == 200      while content["task_status"] == "PENDING":         response = client.get(reverse("get_status", args=[task_id]))         content = json.loads(response.content)     assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Помните, что этот тест использует того же брокера и бэкенд, что и в разработке. Вы можете создать новый экземпляр приложения Celery для тестирования:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Добавьте импорт:

import json

И убедитесь в том, что тесты прошли успешно.

Заключение

Сегодня мы познакомились с базовой настройкой Celery для выполнения долгосрочных заданий в приложении на Django. Вы должны отправлять в очередь обработки любые процессы, которые могут замедлить работу кода на стороне пользователя.

Также Celery можно использовать для выполнения повторяющихся задач и декомпозиции сложных ресурсоемких задач, чтобы распределить вычислительную нагрузку на несколько машин и уменьшить время выполнения и нагрузку на машину, которая обрабатывает запросы клиента.

Весь код вы можете найти в этом репозитории.


Успеть на курс

ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/503380/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *