Попробуем подключить Celery/RabbitMQ к нашему проекту. В качестве основы возьмем проект с Flask’ом. Celery займется вычислением случайного числа.
Установка проекта
Клонируем проект на свой компьютер:
git clone https://github.com/nomhoi/tic-tac-toe-part6.git
Запускаем контейнеры:
cd tic-tac-toe-part6 docker-compose up -d
Выполняем сборку веб-приложения:
cd front npm install npm run-script build
Открываем броузер по адресу http://localhost.
Docker контейнеры
Сервис nginx остался без изменений. В сервис flask добавили установку пакета Celery в файл requirements.txt и смонтировали папку с исходником проекта Celery:
volumes: - ./flask:/code - ./celery/app/proj:/code/proj
Добавились новые сервисы celery и rabbit.
celery: container_name: celery build: context: celery/ dockerfile: Dockerfile volumes: - ./celery/app:/app depends_on: - rabbit networks: - backend rabbit: container_name: rabbit hostname: rabbit image: rabbitmq:3.7.15-alpine environment: - RABBITMQ_DEFAULT_USER=admin - RABBITMQ_DEFAULT_PASS=CT2gNABH8eJ9yVh ports: - "5672:5672" networks: - backend
Сервис celery
Сервис celery выполнен на базе этого туториала. Кто не знаком с Celery, имеет смысл тут-же пройтись по этому туториалу:
$ docker exec -it celery python Python 3.7.3 (default, May 11 2019, 02:00:41) [GCC 8.3.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from proj.tasks import add >>> add.delay(2, 2) <AsyncResult: 43662174-657f-4dd3-ab1a-22f5950c8794> >>>
Как видим, наш проект в Celery оформлен в виде пакета proj. В задачи Celery в файл tasks.py добавлена наша задача getnumber:
@app.task def getnumber(): return randrange(9)
Сервис flask
Напомню, что в этот сервис мы добавили пакет Celery и смонтировали папку с проектом proj. Исходный код этого проекта теперь присутствует в двух сервисах flask и celery.
from flask import Flask, jsonify from proj.tasks import getnumber from proj.celery import app as celery app = Flask(__name__) @app.route('/number') def number(): task = getnumber.delay() return task.id @app.route('/result/<task_id>') def result(task_id): task = getnumber.AsyncResult(task_id) result = task.get(timeout = 3) response = { 'state': task.state, 'number': result, } return jsonify(response)
В обработчике number мы вызываем задачу getnumber, которая выполняется в воркере celery и возвращаем идентификатор задачи. В обработчике result мы получаем результат выполненной задачи по идентификатору и возвращаем ответ в JSON формате фронтенду.
Фронтенд
В диспетчере нашей игры по нажатию кнопки Get random number сначала отправляем запрос бэкенду по адресу /number и получаем от него идентификатор задачи Celery. После этого в функции getResult периодически отправляем запрос бэкенду на получение результата по адресу /result/<task_id>.
async function getResult(task_id) { var i = 1; var timerId = setTimeout(async function go() { console.log("Result request: " + i); console.log("Task Id: " + task_id) const res = await fetch(`result/` + task_id); const response = await res.text(); if (res.ok) { let result = JSON.parse(response); console.log(result) if (result.state === 'SUCCESS') { let i = parseInt(result.number); if ($status === 1 || $history.state.squares[i]) { promise_number = result.number + ' - busy'; return; } promise_number = i; history.push(new Command($history.state, i)); return; } } if (i < 5) setTimeout(go, 500); i++; }, 500); }
Изменили вывод результатов запросов к бэкенду:
{#await promise} <p>...подождите</p> {:then taskid} <p>Task Id: {taskid}</p> {:catch error} <p style="color: red">{error.message}</p> {/await} {#await promise_number} <p>...подождите</p> {:then number} <p>Number: {number}</p> {:catch error} <p style="color: red">{error.message}</p> {/await}
Домашнее задание
На самом деле сейчас результат приходит сразу после первого запроса. Попробуйте нашего интеллектуального агента живущего в celery сделать немного задумчивым, чтобы не сразу выдавал ответ.
Время от времени начинает приходить ошибка от flask‘a «500 (INTERNAL SERVER ERROR)», это в celery поднимается исключение «celery.exceptions.TimeoutError: The operation timed out.». Помогает только перезагрузка сервисов. Пока не копал в чем дело, пожалуйста, посмотрите.
В getResult обрабатывается ответ только с состоянием SUCCESS, в остальных случаях выполняется повторный запрос. Можно добавить обработку ответов с FAILURE и PENDING. Формирование ответа в обработчике result также может зависеть от состояния задачи.
Вместо брокера сообщений RabbitMQ можно попробовать подключить Redis.
Можно попробовать выполнять запросы из приложения через брокеры сообщений.
А также попробовать выполнить это на базе примера с Boost.Beast.
Репозиторий на GitHub
ссылка на оригинал статьи https://habr.com/ru/post/461531/
Добавить комментарий