Это продолжение цикла статей про asyncio
. Начало здесь.
6. Aiohttp и другие жители асинхронного мира
Продолжаем готовить asyncio. Теперь мы уже знаем достаточно много, чтобы написать модный асинхронный микросервис. Реализуем известный архитектурный паттерн «API-шлюз». Это довольно простая штука. По запросу на свой API-интерфейс приложение собирает данные из других API, обрабатывает и возвращает результат пользователю. При этом пользователь знает только одну точку входа, а все внутренние подробности обработки от него скрыты.
В предыдущей главе мы научились запрашивать погоду у сервиса api.openweathermap.org
. Давайте его слегка импортозаместим. Вернее сказать, русифицируем. Пусть пользователь нашего приложения посылает название города на русском языке в параметрах GET-запроса (если вы не знаете что это такое, бегом читать про протокол HTTP) и получает ответ в виде json опять-таки на великом и могучем.
Мы уже освоили http-клиента библиотеки aiohttp
, с помощью которого можно обращаться к внешним API. Оказывается, в этой же библиотеке есть и все необходимое для создания полноценного http-сервера. Для начала напишем просто зеркальный прокси:
Пример 6.1
import json from aiohttp import ClientSession, web async def get_weather(city): async with ClientSession() as session: url = f'http://api.openweathermap.org/data/2.5/weather' \ f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56' async with session.get(url) as response: weather_json = await response.json() try: return weather_json["weather"][0]["main"] except KeyError: return 'Нет данных' async def handle(request): city = request.rel_url.query['city'] weather = await get_weather(city) result = {'city': city, 'weather': weather} return web.Response(text=json.dumps(result, ensure_ascii=False)) if __name__ == '__main__': app = web.Application() app.add_routes([web.get('/weather', handle)]) web.run_app(app)
В асинхронной функции get_weather
ничего нового нет, мы ее лишь слегка «облагородили», чтобы запрос погоды для несуществующего города не приводил к трагическим последствиям для всего нашего приложения. За обработку запроса отвечает функция handle
(«ручка» на сленге бэкендеров). Из запроса извлекается параметр city и передается в get_weather
. Далее формируется результирующий ответ в виде json. Адрес нашего сервиса и тип запроса задается следующим образом: app.add_routes([web.get('/weather', handle)])
.
Стоп! А где же наш старый друг asyncio.run
? Не переживайте. Когда мы имеем дело с асинхронными веб-фреймворками (а aiohttp
— это именно фреймворк, хоть и супер-минималистический), вся работа по созданию и запуску задач asyncio
происходит у фреймворка «под капотом». Где-то глубоко в недрах ‘web.run_app’ создаются задачи, организуется бесконечный цикл и запускается в asyncio.run
. Нам, как разработчикам, теперь нет нужды беспокоится об этих низменных деталях. Приложение мирно спит в бесконечном цикле, пока не придет запрос GET на определенный URL. Как только это произойдет, отработает логика в ручке. И снова баю-бай до следующего запроса. Но если первый запрос еще не успел обработаться, как поступил следующий, фреймворк отработает его в отдельной задаче, не дожидаясь (по возможности) окончания обработки первого. В этом сама суть асинхронности.
Заходим браузером на адрес: localhost:8080/weather?city=Sochi
и получаем симпатичный json:
{"city": "Sochi", "weather": "Clouds"}
Кстати, если вы всерьез решили заняться бэкенд-разработкой, одним браузером вам никак не обойтись. Потребуется инструмент, позволяющий залезать вглубь HTTP. Стандарт де-факто здесь Postman, но в природе есть и альтернативные решения.
Скелет нашего приложения готов, теперь начинаем наращивать на него мышцы. Воспользуемся бесплатным API-переводчиком libretranslate.de
:
Пример 6.2
import json from aiohttp import ClientSession, web async def get_weather(city): async with ClientSession() as session: url = f'http://api.openweathermap.org/data/2.5/weather' \ f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56' async with session.get(url) as response: weather_json = await response.json() try: return weather_json["weather"][0]["main"] except KeyError: return 'Нет данных' async def get_translation(text, source, target): async with ClientSession() as session: url = 'https://libretranslate.de/translate' data = {'q': text, 'source': source, 'target': target, 'format': 'text'} async with session.post(url, json=data) as response: translate_json = await response.json() try: return translate_json['translatedText'] except KeyError: return text async def handle(request): city_ru = request.rel_url.query['city'] city_en = await get_translation(city_ru, 'ru', 'en') weather_en = await get_weather(city_en) weather_ru = await get_translation(weather_en, 'en', 'ru') result = {'city': city_ru, 'weather': weather_ru} return web.Response(text=json.dumps(result, ensure_ascii=False)) if __name__ == '__main__': app = web.Application() app.add_routes([web.get('/weather', handle)]) web.run_app(app)
Теперь в ручке дважды вызывается асинхронная функция get_translation
и, вуаля:
localhost:8080/weather?city=Сочи
{"city": "Сочи", "weather": "Облака"}
Что это за микросервис без логгера? Однако использовать в насквозь асинхронном приложении привычную синхронную (а значит блокирующую) библиотеку logging
— это непрофессионально. Воспользуемся асинхронной библиотекой логгирования aiologger
:
Пример 6.3
import json from aiohttp import ClientSession, web from aiologger.loggers.json import JsonLogger logger = JsonLogger.with_default_handlers( level='DEBUG', serializer_kwargs={'ensure_ascii': False}, ) async def get_weather(city): async with ClientSession() as session: url = f'http://api.openweathermap.org/data/2.5/weather' \ f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56' async with session.get(url) as response: weather_json = await response.json() try: return weather_json["weather"][0]["main"] except KeyError: logger.error(f'Невозможно получить погоду для города: {city}') return 'Нет данных' async def get_translation(text, source, target): await logger.info(f'Поступил запрос на на перевод слова: {text}') async with ClientSession() as session: url = 'https://libretranslate.de/translate' data = {'q': text, 'source': source, 'target': target, 'format': 'text'} async with session.post(url, json=data) as response: translate_json = await response.json() try: return translate_json['translatedText'] except KeyError: logger.error(f'Невозможно получить перевод для слова: {text}') return text async def handle(request): city_ru = request.rel_url.query['city'] await logger.info(f'Поступил запрос на город: {city_ru}') city_en = await get_translation(city_ru, 'ru', 'en') weather_en = await get_weather(city_en) weather_ru = await get_translation(weather_en, 'en', 'ru') result = {'city': city_ru, 'weather': weather_ru} return web.Response(text=json.dumps(result, ensure_ascii=False)) if __name__ == '__main__': app = web.Application() app.add_routes([web.get('/weather', handle)]) web.run_app(app)
Обратите внимание, асинхронный логгер ни на миллисекунду не задержит работу нашего приложения в целом. Он будет использовать паузы, пока мы ожидаем запроса от пользователя или ответа от внешнего сервиса.
Ну а теперь вишенка на торте — асинхронный доступ к базе данных. Предположим, в процессе работы нашего приложения нам надо что-то писать в БД, ну, например, сохранять поступившие запросы (ничего более умного мне в голову не пришло). Самая простая БД в мире, как известно, — это SQLite. И для нее, к счастью, есть асинхронный драйвер aiosqlite
. Пробуем:
Пример 6.4
import json import aiosqlite import asyncio from aiohttp import ClientSession, web from aiologger.loggers.json import JsonLogger from datetime import datetime logger = JsonLogger.with_default_handlers( level='DEBUG', serializer_kwargs={'ensure_ascii': False}, ) async def create_table(): async with aiosqlite.connect('weather.db') as db: await db.execute('CREATE TABLE IF NOT EXISTS requests ' '(date text, city text, weather text)') await db.commit() async def save_to_db(city, weather): async with aiosqlite.connect('weather.db') as db: sql = f'INSERT INTO requests VALUES ("{datetime.now()}", "{city}", "{weather}")' await db.execute(sql) await db.commit() async def get_weather(city): async with ClientSession() as session: url = f'http://api.openweathermap.org/data/2.5/weather' \ f'?q={city}&APPID=2a4ff86f9aaa70041ec8e82db64abf56' async with session.get(url) as response: weather_json = await response.json() try: return weather_json["weather"][0]["main"] except KeyError: logger.error(f'Невозможно получить погоду для города: {city}') return 'Нет данных' async def get_translation(text, source, target): await logger.info(f'Поступил запрос на на перевод слова: {text}') async with ClientSession() as session: url = 'https://libretranslate.de/translate' data = {'q': text, 'source': source, 'target': target, 'format': 'text'} async with session.post(url, json=data) as response: translate_json = await response.json() try: return translate_json['translatedText'] except KeyError: logger.error(f'Невозможно получить перевод для слова: {text}') return text async def handle(request): city_ru = request.rel_url.query['city'] await logger.info(f'Поступил запрос на город: {city_ru}') city_en = await get_translation(city_ru, 'ru', 'en') weather_en = await get_weather(city_en) weather_ru = await get_translation(weather_en, 'en', 'ru') result = {'city': city_ru, 'weather': weather_ru} await save_to_db(city_ru, weather_ru) return web.Response(text=json.dumps(result, ensure_ascii=False)) if __name__ == '__main__': asyncio.run(create_table()) app = web.Application() app.add_routes([web.get('/weather', handle)]) web.run_app(app)
Можете заглянуть внутрь созданного на лету файла weather.db
(только используйте не текстовый просмотрщик, а какую-нибудь приблуду для работы с БД, например, DBeaver). Для каждого запроса создается соответствующая запись в таблице requests
. И снова никаких блокировок, мы ведь живем в асинхронном мире.
Смотрите-ка! Неожиданно появился наш забытый друг asyncio.run
, к чему бы это? А это к тому, что операция создания таблицы requests
(если таковой еще нет) должна выполняться лишь однажды при старте приложения, еще до запуска бесконечного цикла внутри web.run_app
. Но функция-то create_table
у нас асинхронная, так просто ее не запустишь (попробуйте сами, если не верите). Вот и приходится единожды запускать ее «вручную» при помощи asyncio.run
и лишь затем передавать бразды правления web.run_app
.
В заключение этого раздела хочу вас поздравить. Теперь вы имеете в руках все необходимое для создания собственных асинхронных веб-приложений. Неважно какой фреймфорк вы будете использовать: FastAPI
, Tornado
, Falcon
или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp
: создаем ручку и в ней нанизываем «шашлык» из вызовов асинхронных функций. Главное за чем необходимо следить — это чтобы в эти функции не затесалось что-нибудь блокирующее из скучного пыльного синхронного мира.
На этом мы временно прощаемся.
Продолжение следует…
ссылка на оригинал статьи https://habr.com/ru/post/671798/
Добавить комментарий