Приветствую, дорогие коллеги и энтузиасты мира разработки!
Сегодня я рад представить вам новый увлекательный проект, который, несомненно, заинтересует как начинающих, так и опытных разработчиков. Речь пойдет о создании API-сервиса на базе FastAPI для мониторинга актуальных курсов валют в банках России.
Концепция проекта
Мы создадим надежный и быстрый инструмент для отслеживания валютных курсов, который найдет применение как в личных финансах, так и в бизнес-сфере. Для реализации проекта будет использован следующий стек технологий:
-
FastAPI — для создания высокопроизводительного API.
-
Aiohttp — для асинхронных HTTP-запросов.
-
BeautifulSoup4 (BS4) — для эффективного парсинга данных.
-
APScheduler — для выполнения задач по расписанию.
-
SQLAlchemy с Aiosqlite — для асинхронной работы с базой данных.
Архитектура и ключевые компоненты
Проект состоит из нескольких ключевых модулей:
-
Асинхронный парсер агрегатора курсов валют (Aiohttp + BS4).
-
Планировщик задач для регулярного обновления данных, интегрированный в жизненный цикл FastAPI приложения (lifespan).
-
Система аутентификации с разграничением прав доступа.
-
API-методы для взаимодействия с данными о курсах валют.
Функциональность банковского API
Сервис предоставит пользователям следующие возможности:
-
Получение актуальных курсов USD и EUR по всем банкам РФ.
-
Фильтрация курсов по диапазонам цен.
-
Определение банков с наиболее выгодными курсами.
-
Анализ минимальных и максимальных цен на валюту (продажа и покупка).
Процесс разработки и деплой
В ходе статьи мы подробно разберем:
-
Каждый этап разработки.
-
Ключевые решения и возможные трудности.
-
Особенности асинхронного программирования и эффективной работы с данными.
Финальный этап:
Мы выполним быстрый деплой нашего API на Amvera Cloud, что обеспечит:
-
Бесплатный HTTPS-домен.
-
Простоту интеграции с различными платформами (Telegram-бот, веб-сайт или мобильное приложение).
Дисклеймер и контекст
Прежде чем погрузиться в тему, хочу сделать небольшое предисловие. В предыдущих статьях я уже подробно рассказывал о работе с SQLAlchemy, Alembic и FastAPI. Чтобы не повторяться, сегодня я буду часто ссылаться на эти материалы. Если потребуется более глубокое понимание, рекомендую ознакомиться с соответствующими статьями.
О «болванке» для API
Сегодняшний API мы будем строить на основе моей «болванки» — базового шаблона, который я активно использую в рабочих проектах. Подробную презентацию шаблона можно найти в моей предыдущей статье.
Для тех, кто хочет первым получать эксклюзивный контент, не публикуемый на Хабре, приглашаю в мой Telegram-канал «Легкий путь в Python«, в частности, там вы найдете полный исходный код сегодняшнего проекта.
Краткий обзор шаблона
Моя «болванка» — это готовый шаблон для разработки масштабируемых веб-приложений на базе FastAPI. Ключевые особенности:
-
Полная поддержка аутентификации и авторизации.
-
Модульная архитектура.
-
Гибкое логирование с использованием Loguru.
-
Асинхронное взаимодействие с базой данных через SQLAlchemy.
-
Удобная система миграций на базе Alembic.
-
Универсальный класс для работы с БД.
-
Модули для кастомной авторизации, регистрации и аутентификации.
Этот шаблон значительно упрощает и ускоряет подготовку инфраструктуры для новых проектов.
Если найдете этот проект полезным, буду признателен за вашу поддержку в виде звездочки на GitHub! Также буду рад узнать в комментариях, как он помог в вашей работе.
Ссылка на проект: FastAPI Template with Auth
Подготовка проекта
Как упоминалось ранее, для работы мы воспользуемся готовым шаблоном, который будет расширен дополнительными API-методами и моделями таблиц для SQLAlchemy. Из «коробки» вы получите проект с базовыми API-методами и моделями таблиц, обеспечивающими функциональность регистрации, авторизации и аутентификации пользователей с поддержкой ролей.
После входа в систему пользователь будет получать JWT-токен, который будет использоваться для проверки прав доступа к методам.
Подробнее про JWT-токены и общую логику аутентификации в FastAPI вы можете прочитать в моей статье:
Важные материалы по SQLAlchemy
В рамках проекта основное внимание будет уделено работе с SQLAlchemy. Рекомендую ознакомиться с моими статьями, чтобы углубиться в тему:
Настройка проекта
Шаг 1. Клонирование репозитория
Откройте терминал и выполните следующую команду, чтобы скопировать проект на свой компьютер:
git clone https://github.com/Yakvenalex/FastApiWithAuthSample
Эта команда создаст новую папку с проектом, содержащую полную копию репозитория.
Шаг 2. Переход в директорию проекта
Перейдите в папку с проектом:
cd FastApiWithAuthSampl
Теперь проект готов для установки зависимостей и дальнейших шагов по настройке. Откройте его в удобной IDE, например, PyCharm.
Расширение файла requirements.txt
Добавьте следующие модули в файл requirements.txt
:
aiohttp==3.11.2 bs4==0.0.2 apscheduler==3.10.4
-
Aiohttp: библиотека для выполнения асинхронных GET-запросов к агрегатору https://ru.myfin.by.
-
BS4: инструмент для парсинга HTML и извлечения нужных данных.
-
APSCHEDULER: будет использоваться для запуска асинхронного парсера по расписанию (каждые 10 минут).
На выходе файл requirements.txt
должен выглядеть так:
fastapi[all]==0.115.0 pydantic==2.9.2 pydantic[email] uvicorn==0.31.0 jinja2==3.1.4 pydantic_settings==2.5.2 aiosqlite==0.20.0 alembic==1.13.3 SQLAlchemy==2.0.35 bcrypt==4.0.1 passlib[bcrypt]==1.7.4 python-jose==3.3.0 loguru==0.7.2 aiohttp==3.11.2 bs4==0.0.2 apscheduler==3.10.4
Установка зависимостей
Выполните команду для установки всех зависимостей:
pip install -r requirements.txt
Текущая реализация AUTH
Модуль авторизации, аутентификации и ролей в этом проекте уже реализован. Если вы клонировали мой репозиторий, то база данных также была скопирована, поэтому необходимость в выполнении миграций через Alembic отсутствует.
Для проверки запустите приложение FastAPI. Выполните в корне проекта:
uvicorn app.main:app --reload
Если порт 8000 занят, укажите другой, например:
uvicorn app.main:app --port 8900 --reload
Проверка работы
После запуска приложения в терминале вы увидите вывод, подтверждающий его успешную работу. Теперь откройте в браузере адрес:
http://127.0.0.1:8900/docs
На этой странице вы сможете протестировать методы API, проверить их корректность и убедиться в работе всей системы.
После проверки всех методов можно переходить к следующему этапу разработки.
Подготовка к созданию парсера
Теперь мы детально сосредоточимся на написании асинхронного парсера данных. Но сначала давайте в корневой папке app
создадим папку api
и добавим в неё файл schemas.py
.
Если вы читали мои предыдущие статьи по FastAPI, то знаете, что я придерживаюсь подхода, при котором каждый отдельный блок API размещается в отдельной папке. Обычно такая папка содержит следующие файлы:
-
dao.py
: файл с методами SQLAlchemy, предназначенными для работы с конкретной сущностью API. В нашем случае — это методы для работы с «банковским API». -
models.py
: содержит модели SQLAlchemy для сущности API (или микросервиса). Сегодня мы создадим таблицу для хранения курсов валют по банкам. -
schemas.py
: Pydantic-модели для валидации данных конкретного микросервиса. Эти модели будут использоваться в парсере. Если вы не знакомы с Pydantic, рекомендую мою статью Pydantic 2: Полное руководство для Python-разработчиков. -
router.py
: файл с основной функциональностью микросервиса.
Создание схем
Добавим в файл app/api/schemas.py
следующую модель:
from pydantic import BaseModel, ConfigDict class BankNameSchema(BaseModel): bank_en: str class CurrencyRateSchema(BankNameSchema): bank_name: str link: str usd_buy: float usd_sell: float eur_buy: float eur_sell: float update_time: str model_config = ConfigDict(from_attributes=True)
CurrencyRateSchema
будет использоваться в парсере для валидации собранных данных перед их добавлением в таблицу с банками.
Подготовка модели таблицы
Создадим модель таблицы для хранения данных о курсах валют. В файле app/api/models.py
добавим следующий код:
from sqlalchemy.orm import Mapped from app.dao.database import Base, str_uniq, float_col class CurrencyRate(Base): # Название банка (на русском) bank_name: Mapped[str_uniq] # Название банка (на английском, для поиска) bank_en: Mapped[str_uniq] # Ссылка на страницу с курсами валют link: Mapped[str_uniq] # Курсы валют: покупка и продажа USD usd_buy: Mapped[float_col] usd_sell: Mapped[float_col] # Курсы валют: покупка и продажа EUR eur_buy: Mapped[float_col] eur_sell: Mapped[float_col] # Время последнего обновления update_time: Mapped[str]
Поля
id
,updated_at
иcreated_at
добавляются автоматически, так как они наследуются из базового классаBase
.
Миграция таблицы
Выполним миграцию новой таблицы с помощью Alembic. Для начала зарегистрируем таблицу в файле migrations/env.py
. Найдите строку:
from app.auth.models import Role, User
И добавьте под ней:
from app.api.models import CurrencyRate
Теперь выполните две команды:
-
Создание файла с инструкциями для миграции:
alembic revision --autogenerate -m "add currency table"
-
Применение миграции:
alembic upgrade head
После этого в базе данных появится новая таблица.
Создание DAO
Для работы с таблицей создадим класс доступа к данным (DAO). В файле app/api/dao.py
добавим следующий код:
from app.api.models import CurrencyRate from app.dao.base import BaseDAO class CurrencyRateDAO(BaseDAO): model = CurrencyRate
Этот класс наследуется от BaseDAO
, предоставляя универсальные методы для работы с базой данных. Однако нам понадобится написать собственные методы, включая метод для массового обновления записей.
Массовое обновление данных
Метод массового обновления курсов валют будет учитывать, что в источнике данных (сайт-донор) отсутствует поле ID, сопоставимое с нашим. Вместо этого мы используем поле bank_en
— уникальное название банка на английском языке.
Добавим следующий метод в CurrencyRateDAO
:
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.sql.expression import update from typing import List from pydantic import BaseModel import logging logger = logging.getLogger(__name__) class CurrencyRateDAO(BaseDAO): model = CurrencyRate @classmethod async def bulk_update_currency(cls, session: AsyncSession, records: List[BaseModel]) -> int: """Массовое обновление валютных курсов.""" try: updated_count = 0 for record in records: record_dict = record.model_dump(exclude_unset=True) if not (bank_en := record_dict.get('bank_en')): logger.warning("Пропуск записи: отсутствует bank_en") continue update_data = {k: v for k, v in record_dict.items() if k != 'bank_en'} if not update_data: logger.warning(f"Пропуск записи: нет данных для обновления банка {bank_en}") continue stmt = update(cls.model).where(cls.model.bank_en == bank_en).values(**update_data) result = await session.execute(stmt) updated_count += result.rowcount await session.commit() logger.info(f"Обновлено записей: {updated_count}") return updated_count except SQLAlchemyError as e: await session.rollback() logger.error(f"Ошибка массового обновления: {e}") raise
Объяснение метода
-
Валидация входных данных: Проверяется наличие
bank_en
и данных для обновления. -
Формирование SQL-запроса: Используется
update
для массового обновления записей. -
Обработка исключений: При ошибке выполнения откатываются изменения, а ошибка логируется.
На этом этапе мы подготовили основные компоненты для работы с таблицей валютных курсов и реализовали метод для массового обновления. В дальнейшем мы вернёмся к этому классу, чтобы добавить методы для работы с «банковским API».
Следующий шаг — создание асинхронного парсера.
Пишем асинхронный парсер
Для создания парсера в папке app
создадим новую папку scheduler
. В этой папке разместим два файла:
-
parser.py
: асинхронный парсер данных. -
scheduler.py
: скрипты для запуска задач по расписанию.
Настройка scheduler/parser.py
Импорты
import aiohttp import asyncio from loguru import logger from bs4 import BeautifulSoup from aiohttp import ClientSession, ClientTimeout, ClientError from typing import List, Optional from pydantic import BaseModel from app.api.schemas import CurrencyRateSchema
Объяснение импортов:
-
aiohttp
иasyncio
: для работы с асинхронным кодом и HTTP-запросами. -
loguru.logger
: удобная библиотека для логирования. -
BeautifulSoup
: для парсинга HTML. -
aiohttp.ClientSession
,ClientTimeout
,ClientError
: работа с HTTP-сессиями, таймаутами и обработка ошибок. -
typing.List
,typing.Optional
: аннотации типов для списков и опциональных значений. -
pydantic.BaseModel
: валидация данных. -
app.api.schemas.CurrencyRateSchema
: используемая нами модель данных.
Асинхронное получение HTML с обработкой ошибок
async def fetch_html(url: str, session: ClientSession, retries: int = 3) -> Optional[str]: attempt = 0 while attempt < retries: try: async with session.get(url) as response: response.raise_for_status() # Вызывает исключение при ошибке HTTP return await response.text() except (ClientError, asyncio.TimeoutError) as e: logger.error(f"Ошибка при запросе {url}: {e}") attempt += 1 if attempt == retries: logger.critical(f"Не удалось получить данные с {url} после {retries} попыток.") return None await asyncio.sleep(2 ** attempt) # Экспоненциальная задержка except Exception as e: logger.error(f"Неизвестная ошибка при запросе {url}: {e}") return None return None
Эта функция отправляет HTTP-запрос, повторяет попытки при неудаче и логирует ошибки.
Парсинг HTML-кода
def parse_currency_table(html: str) -> List[BaseModel]: soup = BeautifulSoup(html, 'html.parser') try: table = soup.find('table', class_='content_table').find('tbody') rows = table.find_all('tr') currencies = [] for row in rows: bank_name = row.find('td', class_='bank_name').get_text(strip=True) link = row.find('a') try: usd_buy = float(row.find_all('td', class_='USD')[0].get_text(strip=True).replace(',', '.')) usd_sell = float(row.find_all('td', class_='USD')[1].get_text(strip=True).replace(',', '.')) eur_buy = float(row.find_all('td', class_='EUR')[0].get_text(strip=True).replace(',', '.')) eur_sell = float(row.find_all('td', class_='EUR')[1].get_text(strip=True).replace(',', '.')) except (ValueError, IndexError) as e: logger.warning(f"Ошибка при парсинге курсов валют для {bank_name}: {e}") continue update_time = row.find('time').get_text(strip=True) link_info = get_link_info(link) currencies.append(CurrencyRateSchema(**{ 'bank_name': bank_name, 'bank_en': link_info[1], 'link': link_info[0], 'usd_buy': usd_buy, 'usd_sell': usd_sell, 'eur_buy': eur_buy, 'eur_sell': eur_sell, 'update_time': update_time, })) return currencies except Exception as e: logger.error(f"Ошибка при парсинге HTML: {e}") return []
Эта функция парсит HTML-страницу, полученную после GET-запроса к агрегатору, и извлекает данные о банках и курсах валют, возвращая список объектов CurrencyRateSchema
.
Объединение логики
async def fetch_page_data(url: str, session: ClientSession) -> List[BaseModel]: html = await fetch_html(url, session) if html: return parse_currency_table(html) return []
Главная функция парсера
async def fetch_all_currencies() -> List[BaseModel]: all_currencies = [] base_url = 'https://ru.myfin.by/currency?page=' timeout = ClientTimeout(total=10, connect=5) async with aiohttp.ClientSession(timeout=timeout) as session: tasks = [fetch_page_data(f'{base_url}{page}', session) for page in range(1, 5)] results = await asyncio.gather(*tasks) for currencies in results: all_currencies.extend(currencies) return all_currencies
Здесь используется asyncio.gather
для параллельной обработки запросов. Это значительно ускоряет сбор данных.
Настройка scheduler.py
from app.api.dao import CurrencyRateDAO from app.dao.session_maker import session_manager from app.scheduler.parser import fetch_all_currencies @session_manager.connection(commit=True) async def add_data_to_db(session): rez = await fetch_all_currencies() await CurrencyRateDAO.add_many(session=session, instances=rez) @session_manager.connection(commit=True) async def upd_data_to_db(session): rez = await fetch_all_currencies() await CurrencyRateDAO.bulk_update_currency(session=session, records=rez)
Объяснение функций:
-
add_data_to_db
: первичное добавление данных. -
upd_data_to_db
: обновление данных.
Тут я продемонстрировал гибкость подхода в использовании моего универсального класса для генерации сессий. В данном случае мы импортировали session_manager. Эксклюзивное пояснение того, как это все работает я давал в своем телеграмм канале, но, думаю, вы и сами разберетесь с этим.
Первый метод (add_data_to_db) выполняет первичное добавление информации по банкам в базу данных. На этом этапе выполняется, как сам парсер, так и добавление полученных данных в базу данных.
Для добавления используется универсальный метод add_many базового класса BaseDao.
И с ним есть важный момент. В реальной практике данный метод нужно усилить и внедрять его в расписание на выполнение, например, один раз при запуске приложения. Но, так как проект учебный, я предлагаю его выполнить всего 1 раз прямо с файла в котором написан этот код.
Одноразовый запуск добавления данных
import asyncio asyncio.run(add_data_to_db())
После выполнения можно удалить эти строки кода.
Проверим собраны ли данные.
Теперь, когда у нас есть что обновлять мы можем включить метод для обновления данных в расписание FastApi. Предлагаю сделать, чтоб парсер запускался 1 раз в 10 минут. Не вижу смысла делать это чаще.
Включаем задачу в расписание
Чтобы включить задачу в расписание, нам нужно внести изменения в файл main.py
. Основная задача — описать жизненный цикл приложения FastAPI.
Добавление импортов
from contextlib import asynccontextmanager from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from app.scheduler.scheduller import upd_data_to_db
Разберем подробнее:
-
from contextlib import asynccontextmanager
Позволяет создавать асинхронные контекстные менеджеры для управления ресурсами в асинхронном коде. -
from apscheduler.schedulers.asyncio import AsyncIOScheduler
Асинхронный планировщик задач, который работает с циклом событий asyncio. -
from apscheduler.triggers.interval import IntervalTrigger
Используется для создания триггеров, которые запускают задачи с заданным интервалом. -
from app.scheduler.scheduller import upd_data_to_db
Импорт функции, которая будет обновлять данные в базе данных по расписанию.
Создание планировщика
scheduler = AsyncIOScheduler()
Объект scheduler
будет отвечать за планирование и выполнение задач.
Функция управления жизненным циклом приложения
@asynccontextmanager async def lifespan(app: FastAPI): """ Управляет жизненным циклом планировщика приложения. Args: app (FastAPI): Экземпляр приложения FastAPI. """ try: # Настройка и запуск планировщика scheduler.add_job( upd_data_to_db, trigger=IntervalTrigger(minutes=10), id='currency_update_job', replace_existing=True ) scheduler.start() logger.info("Планировщик обновления курсов валют запущен") yield except Exception as e: logger.error(f"Ошибка инициализации планировщика: {e}") finally: # Завершение работы планировщика scheduler.shutdown() logger.info("Планировщик обновления курсов валют остановлен")
Описание кода:
-
Декоратор
@asynccontextmanager
:
Позволяет функцииlifespan
работать как асинхронный контекстный менеджер. -
Блок
try
:-
Добавляет задачу
upd_data_to_db
в планировщик с 10-минутным интервалом. -
Запускает планировщик.
-
Логирует успешный запуск.
-
-
yield
:
Позволяет приложению продолжить выполнение после запуска планировщика. -
Блок
except
:
Логирует ошибки инициализации. -
Блок
finally
:
Останавливает планировщик при завершении работы приложения и логирует это.
Интеграция жизненного цикла в приложение
app = FastAPI(lifespan=lifespan)
Что мы сделали:
-
Создали экземпляр приложения FastAPI.
-
Добавили асинхронный контекстный менеджер
lifespan
для управления жизненным циклом приложения. -
Интеграция позволяет автоматически запускать и завершать работу планировщика.
Остальная часть кода в main.py
остается без изменений.
Проверка работы
Теперь достаточно перезапустить приложение. Планировщик начнет автоматически обновлять данные раз в 10 минут, и это будет отражено в логах.
Следующий шаг
Когда задачи, связанные с расписанием, завершены, можно переходить к созданию API-методов для работы с «банковским API». Приступим!
Создание банковского API
Мы уже создали папку app/api
и продолжим с ней дальнейшую работу.
На этом этапе нам необходимо реализовать дополнительные методы для работы с данными от агрегатора, создать сопутствующие схемы Pydantic и, наконец, реализовать сами эндпоинты (API-методы).
Напишем несколько простых эндпоинтов, которые не потребуют создания дополнительных методов для работы с таблицами. Мы сможем ограничиться универсальными методами из BaseDao
. Реализуем следующие API-методы:
-
Метод, который возвращает актуальные курсы валют всех банков для авторизованных пользователей.
-
Метод, который возвращает курсы валют конкретного банка по его английскому названию для авторизованных пользователей.
-
Метод, который возвращает расширенную информацию о курсах валют (только для администраторов).
Работаем с файлом app/api/router.py
.
Импорты
Начнем с импортов:
from typing import List from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.api.utils import validate_range, validate_currency_type, get_currency_ranges from app.auth.dependencies import get_current_user, get_current_admin_user from app.auth.models import User from app.config import settings from app.dao.session_maker import SessionDep from app.api.dao import CurrencyRateDAO from app.api.schemas import ( CurrencyRateSchema, BankNameSchema, CurrencyRangeFilterSchema, AdminCurrencySchema, CurrencySaleRangeFilterSchema, BestRateResponse )
Обратите внимание, что на блок со схемами пока не нужно заострять внимание, так как недостающие схемы мы заполним чуть позже.
Важные детали
-
Импорт
SessionDep
: Этот объект из пакетаdao
— специальный генератор асинхронной сессии, созданный под FastAPI. Его особенность в том, что он не выполняет коммит в базе данных после операций. -
Импорты из
utils
: Здесь подключены функции, вынесенные в файлapi/utils.py,
который мы скоро создадим. -
Настройки: Для работы с переменными окружения и настройками используется модуль
config
.
Давайте реализуем недостающий функционал. Начнем с настроек.
Изменение файла app/config.py
Изменим файл следующим образом:
import os from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): BASE_DIR: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) DB_URL: str = f"sqlite+aiosqlite:///{BASE_DIR}/data/db.sqlite3" SECRET_KEY: str ALGORITHM: str VALID_CURRENCIES: list = ["usd", "eur"] ERROR_MESSAGES: dict = { "currency_type": "Некорректный тип валюты. Используйте 'usd' или 'eur'.", "range": "Неверно задан диапазон.", "not_found": "Не найдены курсы валют.", "bank_not_found": "Банк не найден." } CURRENCY_FIELDS: dict = { 'usd': {'buy': 'usd_buy', 'sell': 'usd_sell'}, 'eur': {'buy': 'eur_buy', 'sell': 'eur_sell'} } model_config = SettingsConfigDict(env_file=f"{BASE_DIR}/.env") # Получаем параметры для загрузки переменных среды settings = Settings() database_url = settings.DB_URL
Что добавлено?
-
VALID_CURRENCIES
: Список поддерживаемых валют для API. -
ERROR_MESSAGES
: Словарь с заготовленными сообщениями об ошибках для различных ситуаций. -
CURRENCY_FIELDS
: Словарь с ключами, которые будут использоваться в методах SQLAlchemy.
Эти переменные введены для оптимизации и упрощения кода. Хотя можно было не выносить их в класс Settings
, такой подход обеспечивает чистоту и повторное использование.
Теперь у нас есть доступ ко всем настройкам через объект класса Settings
, включая переменные из файла .env
и добавленные вручную. Например, чтобы обратиться к переменной, достаточно использовать точечный синтаксис:
-
settings.VALID_CURRENCIES
.
Создание файла app/api/utils.py
В следующем шаге мы создадим файл utils.py
в папке app/api
, чтобы вынести часть кода для оптимизации и экономии времени.
Приложу полный код и после прокомментирую:
from typing import Tuple from fastapi import HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.api.dao import CurrencyRateDAO from app.config import settings def validate_currency_type(currency_type: str) -> str: """Проверяет корректность типа валюты.""" if currency_type.lower() not in settings.VALID_CURRENCIES: raise HTTPException(status_code=400, detail=settings.ERROR_MESSAGES["currency_type"]) return currency_type.lower() def validate_range(min_val: float, max_val: float) -> None: """Проверяет корректность диапазона значений.""" if min_val > max_val: raise HTTPException(status_code=400, detail=settings.ERROR_MESSAGES["range"]) async def get_currency_ranges( currency_type: str, operation: str, session: AsyncSession ) -> Tuple[Tuple[float, float], Tuple[float, float]]: """Получает диапазоны для основной и альтернативной валюты.""" other_currency = 'eur' if currency_type == 'usd' else 'usd' requested_range = await CurrencyRateDAO.get_currency_range( currency=currency_type, operation=operation, session=session ) other_range = await CurrencyRateDAO.get_currency_range( currency=other_currency, operation=operation, session=session ) return requested_range, other_range
Краткое описание функций
-
validate_currency_type(currency_type: str) -> str
:-
Проверяет, что указанный тип валюты (например,
"USD"
или"EUR"
) допустим. -
Если тип валюты недопустим, вызывает исключение
HTTPException
с кодом 400 и сообщением об ошибке. -
Возвращает тип валюты в нижнем регистре.
-
-
validate_range(min_val: float, max_val: float) -> None
:-
Проверяет корректность диапазона (например, минимальное значение не должно быть больше максимального).
-
Если диапазон некорректен, вызывает исключение
HTTPException
с кодом 400.
-
-
get_currency_ranges(currency_type: str, operation: str, session: AsyncSession) -> Tuple[Tuple[float, float], Tuple[float, float]]
:-
Асинхронно запрашивает диапазоны курсов для двух валют:
-
Основная валюта (заданная
currency_type
). -
Альтернативная валюта (определяется как
"USD"
, если основная —"EUR"
, и наоборот).
-
-
Использует DAO (Data Access Object) для получения данных из базы через сессию
AsyncSession
. -
Возвращает два диапазона в виде кортежей чисел.
-
Когда мы включим этот код в наши API-методы, станет понятнее, зачем они вынесены в отдельный файл.
Назначение роутера в файле app/api/router.py
Определяем роутер:
router = APIRouter(prefix='/api', tags=['API'])
Первый эндпоинт: получение всех курсов валют
@router.get("/all_currency/") async def get_all_currency( user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> List[CurrencyRateSchema]: """Возвращает актуальные курсы валют всех банков.""" return await CurrencyRateDAO.find_all(session=session, filters=None)
Разбор эндпоинта
-
Путь: GET-запрос на
/api/all_currency
(префикс/api
добавлен черезAPIRouter
). -
Зависимости:
-
get_current_user
: Проверяет авторизацию пользователя. -
SessionDep
: Генерирует асинхронную сессию для работы с базой данных.
-
-
Логика:
-
Используется универсальный метод
find_all
изBaseDao
для получения всех записей без фильтров. -
Сессия передается первым аргументом для обеспечения корректной работы.
-
Зависимости в FastAPI
— get_current_user
Эта зависимость у вас уже есть, если вы использовали мой шаблон. Сама она из микросервиса auth
и она выполняет проверки:
-
Есть ли в cookies JWT-токен.
-
Не истек ли срок действия токена.
-
Удается ли получить информацию о пользователе.
SessionDep
Генератор сессий, который автоматически управляет жизненным циклом соединений с базой данных. Зависимость выполняется до основной логики эндпоинта. Тоже у вас есть если работаете с моим шаблоном.
Совет: Всегда явно передавайте сессию как именованный аргумент, чтобы избежать потенциальных проблем и повысить читаемость кода.
Эндпоинт: курсы валют конкретного банка
@router.get("/currency_by_bank/{bank_en}") async def get_currency_by_bank( bank_en: str, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> CurrencyRateSchema | None: """Возвращает курсы валют конкретного банка по его английскому названию.""" currencies = await CurrencyRateDAO.find_one_or_none(session=session, filters=BankNameSchema(bank_en=bank_en)) if not currencies: raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["bank_not_found"]) return currencies
Разбор эндпоинта
-
Путь: GET-запрос на
/api/currency_by_bank/{bank_en}
. -
Логика:
-
Метод принимает параметр
bank_en
— английское название банка. -
Используется метод
find_one_or_none
изBaseDao
для поиска записи. -
Если банк не найден, вызывается исключение
HTTPException
с кодом 404, используя сообщение изsettings.ERROR_MESSAGES
.
-
Эндпоинт: расширенная информация о курсах валют (для администраторов)
@router.get("/all_currency_admin/") async def get_all_currency_admin( user_data: User = Depends(get_current_admin_user), session: AsyncSession = SessionDep ) -> List[AdminCurrencySchema]: """Возвращает расширенную информацию о курсах валют (только для админов).""" return await CurrencyRateDAO.find_all(session=session, filters=None)
Новая схема Pydantic
Для работы этого эндпоинта создадим схему:
class AdminCurrencySchema(CurrencyRateSchema): id: int created_at: datetime updated_at: datetime
Эта схема наследуется от CurrencyRateSchema
и добавляет три поля: id
, created_at
и updated_at
.
В новом эндпоинте используется зависимость get_current_admin_user:
async def get_current_admin_user(current_user: User = Depends(get_current_user)): if current_user.role.id in [3, 4]: return current_user raise ForbiddenException
Эта зависимость проверяет, что пользователь имеет административную роль. Она расширяет функциональность get_current_user
, добавляя дополнительную проверку.
Дополнение для взаимодействия с базой данных
Для создания новых эндпоинтов (API-методов) нужно написать дополнительные методы в CurrencyRateDAO
, которые используют SQLAlchemy.
Примечание: Для понимания работы с
CurrencyRateDAO
рекомендуется иметь базовые знания SQLAlchemy. Если нужно освежить знания, можно обратиться к моим статьям на Хабре, где я подробно разбираю основы SQLAlchemy 2.0.
Пишем новые DAO методы
Сейчас я продемонстрирую полный код класса CurrencyRateDAO со всеми новыми методами и после коротко разберу каждый метод, чтоб не занимать много вашего и своего времени. В целом, должно быть все понятно.
from pydantic import BaseModel from sqlalchemy import select, update, desc from sqlalchemy.exc import SQLAlchemyError from typing import List, Optional, Tuple, Dict, Any from sqlalchemy.ext.asyncio import AsyncSession from app.api.models import CurrencyRate from app.api.schemas import CurrencyRateSchema, BestRateResponse from app.config import settings from app.dao.base import BaseDAO from loguru import logger class CurrencyRateDAO(BaseDAO): model = CurrencyRate @classmethod async def _get_value_range(cls, session: AsyncSession, field: str) -> Tuple[float, float]: """Получает минимальное и максимальное значение для указанного поля.""" try: result = await session.execute(select(getattr(cls.model, field))) values = result.scalars().all() return (min(values), max(values)) if values else (0.0, 0.0) except SQLAlchemyError as e: logger.error(f"Ошибка при получении диапазона для {field}: {e}") raise @classmethod async def _find_by_range( cls, field_name: str, min_value: float, max_value: float, session: AsyncSession ) -> List[CurrencyRateSchema]: """Поиск валютных курсов по диапазону для указанного поля.""" try: query = select(cls.model).filter( getattr(cls.model, field_name).between(min_value, max_value) ) result = await session.execute(query) records = result.scalars().all() return [CurrencyRateSchema.from_orm(record) for record in records] except SQLAlchemyError as e: logger.error(f"Ошибка при поиске по диапазону {field_name}: {e}") raise @classmethod async def find_by_range_multi( cls, ranges: Dict[str, Tuple[float, float]], session: AsyncSession ) -> List[CurrencyRateSchema]: """Поиск валютных курсов по нескольким диапазонам.""" results = [] for field, (min_val, max_val) in ranges.items(): results.extend(await cls._find_by_range(field, min_val, max_val, session)) return results @classmethod async def find_by_purchase_range( cls, usd_buy_min: float, usd_buy_max: float, eur_buy_min: float, eur_buy_max: float, session: AsyncSession ) -> List[CurrencyRateSchema]: """Поиск курсов по диапазону покупки USD/EUR.""" ranges = { 'usd_buy': (usd_buy_min, usd_buy_max), 'eur_buy': (eur_buy_min, eur_buy_max) } return await cls.find_by_range_multi(ranges, session) @classmethod async def find_by_sale_range( cls, usd_sell_min: float, usd_sell_max: float, eur_sell_min: float, eur_sell_max: float, session: AsyncSession ) -> List[CurrencyRateSchema]: """Поиск курсов по диапазону продажи USD/EUR.""" ranges = { 'usd_sell': (usd_sell_min, usd_sell_max), 'eur_sell': (eur_sell_min, eur_sell_max) } return await cls.find_by_range_multi(ranges, session) @classmethod async def get_currency_range(cls, currency: str, operation: str, session: AsyncSession) -> Tuple[float, float]: """Получает диапазон цен для указанной валюты и операции.""" field = settings.CURRENCY_FIELDS[currency][operation] return await cls._get_value_range(session, field) @classmethod async def bulk_update_currency(cls, session: AsyncSession, records: List[BaseModel]) -> int: """Массовое обновление валютных курсов.""" разбирали ранее @classmethod async def _find_best_rate( cls, currency_type: str, operation: str, session: AsyncSession ) -> Optional[BestRateResponse]: """Находит лучший курс для указанной валюты и операции.""" try: field = settings.CURRENCY_FIELDS[currency_type][operation] order_by = desc(field) if operation == 'sell' else field query = select(cls.model).order_by(order_by) result = await session.execute(query) rates = result.scalars().all() if not rates: return None best_value = getattr(rates[0], field) best_banks = [ bank.bank_name for bank in rates if getattr(bank, field) == best_value ] return BestRateResponse(rate=best_value, banks=best_banks) except SQLAlchemyError as e: logger.error(f"Ошибка поиска лучшего курса: {e}") raise @classmethod async def find_best_purchase_rate(cls, currency_type: str, session: AsyncSession) -> Optional[BestRateResponse]: """Находит лучший курс покупки для указанной валюты.""" return await cls._find_best_rate(currency_type, 'buy', session) @classmethod async def find_best_sale_rate(cls, currency_type: str, session: AsyncSession) -> Optional[BestRateResponse]: """Находит лучший курс продажи для указанной валюты.""" return await cls._find_best_rate(currency_type, 'sell', session)
Разбор новых методов
1. _get_value_range
Что делает?
Метод ищет минимальное и максимальное значение для указанного поля (например, usd_buy
или eur_sell
) в таблице.
Как работает?
-
Выполняет запрос для получения всех значений из базы данных по указанному полю.
-
Если данные есть, возвращает минимальное и максимальное значения.
-
Если данных нет, возвращает
(0.0, 0.0)
.
Пример:
Если мы ищем минимальный и максимальный курс покупки доллара, метод вернёт, например, (74.5, 75.8)
.
2. _find_by_range
Что делает?
Ищет записи в базе данных, где значения указанного поля находятся в заданном диапазоне (от min_value
до max_value
).
Как работает?
-
Фильтрует записи таблицы с помощью метода
between
(значение «между»). -
Преобразует найденные записи в формат, описанный в схеме
CurrencyRateSchema
.
Пример:
Если задать диапазон для usd_buy
от 74.0
до 75.0
, метод найдёт все записи с курсами покупки доллара в этом диапазоне.
3. find_by_range_multi
Что делает?
Ищет записи, попадающие в несколько диапазонов одновременно, для разных полей.
Как работает?
-
Проходит по всем переданным диапазонам (например, для
usd_buy
иeur_sell
). -
Использует метод
_find_by_range
для поиска по каждому полю. -
Собирает результаты в общий список.
Пример:
Можно передать диапазоны для usd_buy
(74.0–75.0
) и eur_sell
(84.0–85.0
). Метод вернёт записи, удовлетворяющие хотя бы одному из условий.
4. find_by_purchase_range
Что делает?
Ищет курсы покупки (доллара и евро), попадающие в заданные диапазоны.
Как работает?
-
Принимает минимальные и максимальные значения для
usd_buy
иeur_buy
. -
Формирует диапазоны и вызывает метод
find_by_range_multi
.
Пример:
Задайте диапазон для курса покупки доллара (74.0–75.0
) и евро (84.0–85.0
), чтобы получить записи с подходящими курсами.
5. find_by_sale_range
Что делает?
Похож на метод find_by_purchase_range
, но работает с диапазонами для курсов продажи (usd_sell
и eur_sell
).
6. get_currency_range
Что делает?
Определяет диапазон значений для конкретной валюты (usd
или eur
) и операции (buy
или sell
).
Как работает?
-
Использует поле из настроек приложения (
settings.CURRENCY_FIELDS
), соответствующее валюте и операции. -
Вызывает метод
_get_value_range
для поиска минимума и максимума по этому полю.
Пример:
Для курса покупки доллара вызов метода вернёт диапазон, например, (74.5, 75.8)
.
7. _find_best_rate
Что делает?
Ищет лучший курс (минимальный для покупки или максимальный для продажи) для указанной валюты.
Как работает?
-
Выбирает поле, связанное с валютой и операцией.
-
Сортирует записи по этому полю:
-
По убыванию (для продажи).
-
По возрастанию (для покупки).
-
-
Возвращает объект с лучшим курсом и списком банков, предлагающих этот курс.
Пример:
Для курса продажи евро может вернуть:
{'rate': 85.5, 'banks': ['Bank1', 'Bank2']}
.
8. find_best_purchase_rate
Что делает?
Находит лучший курс покупки для указанной валюты.
Как работает?
-
Вызывает метод
_find_best_rate
, передавая тип валюты (usd
илиeur
) и операциюbuy
.
9. find_best_sale_rate
Что делает?
Находит лучший курс продажи для указанной валюты.
Как работает?
Похож на find_best_purchase_rate
, но передаёт операцию sell
.
Надеюсь, что с этим все понятно. Если что-то осталось непонятным, задавайте вопросы в комментариях к статье или присоединяйтесь к нашему телеграм-сообществу «Легкий путь в Python» (там уже более 1400 участников).
Создание новых API-методов
На основе новых методов SQLAlchemy создадим API-методы для взаимодействия с данными.
Метод: Получение курсов валют в диапазоне цен покупки для USD и EUR
@router.post("/currency_in_purchase_range/") async def get_currency_in_purchase_range( filter_data: CurrencyRangeFilterSchema, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> List[CurrencyRateSchema]: """ Возвращает курсы валют, находящиеся в заданном диапазоне цен покупки для USD и EUR. """ validate_range(filter_data.usd_min, filter_data.usd_max) validate_range(filter_data.eur_min, filter_data.eur_max) currencies = await CurrencyRateDAO.find_by_purchase_range( usd_buy_min=filter_data.usd_min, usd_buy_max=filter_data.usd_max, eur_buy_min=filter_data.eur_min, eur_buy_max=filter_data.eur_max, session=session ) if not currencies: raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"]) return currencies
Хотя мы получаем данные, реализация выполнена через POST-запрос для удобства тестирования. В этом методе используются две схемы Pydantic:
-
CurrencyRangeFilterSchema
— проверяет входные данные:class CurrencyRangeFilterSchema(BaseModel): usd_min: float | None = 0 usd_max: float | None = 0 eur_min: float | None = 0 eur_max: float | None = 0
-
CurrencyRateSchema
— проверяет корректность возвращаемых данных (обсуждалось ранее).
После валидации входных данных вызывается метод find_by_purchase_range
, реализованный ранее.
Метод: Получение курсов валют в диапазоне цен продажи для USD и EUR
@router.post("/currency_in_sale_range/") async def get_currency_in_sale_range( filter_data: CurrencySaleRangeFilterSchema, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> List[CurrencyRateSchema]: """ Возвращает курсы валют, находящиеся в заданном диапазоне цен продажи для USD и EUR. """ validate_range(filter_data.usd_sale_min, filter_data.usd_sale_max) validate_range(filter_data.eur_sale_min, filter_data.eur_sale_max) currencies = await CurrencyRateDAO.find_by_sale_range( usd_sell_min=filter_data.usd_sale_min, usd_sell_max=filter_data.usd_sale_max, eur_sell_min=filter_data.eur_sale_min, eur_sell_max=filter_data.eur_sale_max, session=session ) if not currencies: raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"]) return currencies
Используется схема CurrencySaleRangeFilterSchema
для валидации входных данных:
class CurrencySaleRangeFilterSchema(BaseModel): usd_sale_min: float | None = 0 usd_sale_max: float | None = 0 eur_sale_min: float | None = 0 eur_sale_max: float | None = 0
Остальная логика идентична предыдущему методу.
Метод: Получение информации о банках с лучшим курсом покупки для выбранной валюты
@router.get("/best_purchase_rate/{currency_type}") async def get_best_purchase_rate( currency_type: str, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> BestRateResponse: """ Возвращает информацию о банках с лучшим курсом покупки для выбранной валюты. """ currency_type = validate_currency_type(currency_type) result = await CurrencyRateDAO.find_best_purchase_rate(currency_type=currency_type, session=session) if not result or not result.banks: raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"]) return result
На вход подается строка, представляющая валюту (например, «usd» или «eur»). Это позволяет избежать избыточности в реализации.
Метод: Получение информации о банках с лучшим курсом продажи для выбранной валюты
@router.get("/best_sale_rate/{currency_type}") async def get_best_sale_rate( currency_type: str, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> BestRateResponse: """ Возвращает информацию о банках с лучшим курсом продажи для выбранной валюты. """ currency_type = validate_currency_type(currency_type) result = await CurrencyRateDAO.find_best_sale_rate(currency_type=currency_type, session=session) if not result or not result.banks: raise HTTPException(status_code=404, detail=settings.ERROR_MESSAGES["not_found"]) return result
Метод идентичен предыдущему, за исключением используемого метода DAO.
Метод: Получение минимальных и максимальных цен покупки для обеих валют
@router.get("/currency_purchase_range/{currency_type}") async def get_currency_purchase_range( currency_type: str, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> CurrencyRangeFilterSchema: """ Возвращает минимальные и максимальные цены покупки для обеих валют. """ currency_type = validate_currency_type(currency_type) requested_range, other_range = await get_currency_ranges(currency_type, 'buy', session) return CurrencyRangeFilterSchema( usd_min=requested_range[0] if currency_type == 'usd' else other_range[0], usd_max=requested_range[1] if currency_type == 'usd' else other_range[1], eur_min=other_range[0] if currency_type == 'usd' else requested_range[0], eur_max=other_range[1] if currency_type == 'usd' else requested_range[1] )
Эндпоинт принимает тип валюты (usd
или eur
) и возвращает минимальные и максимальные цены покупки для обеих валют.
Метод: Получение минимальных и максимальных цен продажи для обеих валют
@router.get("/currency_sale_range/{currency_type}") async def get_currency_sale_range( currency_type: str, user_data: User = Depends(get_current_user), session: AsyncSession = SessionDep ) -> CurrencySaleRangeFilterSchema: """ Возвращает минимальные и максимальные цены продажи для обеих валют. """ currency_type = validate_currency_type(currency_type) requested_range, other_range = await get_currency_ranges(currency_type, 'sell', session) return CurrencySaleRangeFilterSchema( usd_sale_min=requested_range[0] if currency_type == 'usd' else other_range[0], usd_sale_max=requested_range[1] if currency_type == 'usd' else other_range[1], eur_sale_min=other_range[0] if currency_type == 'usd' else requested_range[0], eur_sale_max=other_range[1] if currency_type == 'usd' else requested_range[1] )
Логика похожая и большого внимания не заслуживает.
Теперь я предлагаю вам сделать следующее: перезапустите сервис и попробуйте использовать созданные методы API в автоматически созданной документации FastApi. Помните, что все методы будут требовать либо прав администратора, либо простой авторизации в системе.
Если все тесты прошли успешно, то следующим шагом станет размещение нашего приложения FastApi на сервисе Amvera Cloud.
После того как мы завершим этот процесс, у нас будет возможность интегрировать наш API в различные системы, такие как сайты, мобильные приложения, телеграм-боты и другие, а также проводить тестирование в реальных условиях, отправляя асинхронные запросы через aiohttp.
Деплой API на Amvera Cloud
Я упоминал, что деплой займет всего 5 минут. Сейчас докажу это на практике.
Шаг 1: Создание файла конфигурации amvera.yml
На одном уровне с файлами .env
и requirements.txt
создайте файл amvera.yml
. Этот файл содержит настройки, которые помогут Amvera Cloud собрать и запустить ваш проект. Пример настроек:
meta: environment: python toolchain: name: pip version: 3.12 build: requirementsPath: requirements.txt run: persistenceMount: /data containerPort: 8000 command: uvicorn app.main:app --host 0.0.0.0 --port 8000
Описание настроек:
-
meta
: Определяет среду выполнения (Python) и инструмент для управления зависимостями (pip). -
build
: Указывает путь к файлуrequirements.txt
для установки зависимостей. -
run
: Настраивает путь для сохранения данных (папка/data
), порт контейнера (8000), и команду запуска приложения черезuvicorn
.
Шаг 2: Регистрация и создание проекта на Amvera Cloud
-
Регистрация: Зайдите на сайт Amvera Cloud и зарегистрируйтесь, если у вас еще нет аккаунта. Новые пользователи получают 111 рублей на основной баланс.
-
Создание проекта: Нажмите кнопку «Создать проект».
-
Настройка проекта:
-
Укажите имя проекта.
-
Выберите тарифный план. Для текущего проекта подойдет тариф «Начальный». Нажмите «Далее».
-
-
Загрузка файлов:
-
На следующем шаге выберите способ доставки файлов: через GIT или загрузку вручную.
-
Для простоты используйте интерфейс загрузки: перетащите все файлы проекта «как есть». Убедитесь, что добавлены файлы
amvera.yml
иrequirements.txt
. Нажмите «Далее».
-
-
Проверка конфигурации: На следующем экране проверьте корректность настроек и подтвердите их.
Шаг 3: Настройка домена
-
Войдите в созданный проект и перейдите на вкладку «Настройки».
-
Нажмите «Добавить домен»:
-
Выберите HTTPS.
-
Укажите бесплатный домен Amvera или добавьте собственный домен (например, с REG.RU), если у вас уже есть зарегистрированный домен.
-
Шаг 4: Ожидание деплоя
Через 2–3 минуты проект будет развернут. Вы увидите что-то похожее на это:
Пример ссылки на документацию проекта:
-
https://bankiru-yakvenalex.amvera.io/docs (в вашем случае адрес будет уникальным).
-
Полный исходный код проекта можно найти в моем бесплатном телеграм-канале.
Перейдите по выделенной ссылке, чтобы убедиться, что всё работает корректно.
Если всё сделано правильно, API готов к использованию в боевом режиме!
Тестируем API
Вы, вероятно, ожидаете увидеть здесь Pytest, но я решил не усложнять и написать несколько асинхронных функций на Aiohttp для тестирования нашего API. Такой подход будет понятен как новичкам, так и опытным Python-разработчикам.
Для этого в корне проекта я создам файл api_test.py
.
Сначала выполним необходимые импорты и назначим переменные, которые позволят нам работать с запросами:
import aiohttp from loguru import logger from asyncio import run BASE_SITE = 'https://bankiru-yakvenalex.amvera.io/' TAG_AUTH = 'auth/' TAG_API = 'api/' headers = { "accept": "application/json", "Content-Type": "application/json", }
Через aiohttp мы будем выполнять асинхронные запросы к нашему API, через logger будем выводить информацию после запросов в консоль, а при помощи run будем запускать асинхронный код.
Теперь напишем первую функцию для регистрации пользователя в системе:
async def register_user( email: str, phone_number: str, first_name: str, last_name: str, password: str, confirm_password: str ): """Выполняет POST-запрос на регистрацию пользователя.""" url = f"{BASE_SITE}/{TAG_AUTH}/register/" payload = { "email": email, "phone_number": phone_number, "first_name": first_name, "last_name": last_name, "password": password, "confirm_password": confirm_password, } async with aiohttp.ClientSession() as session: try: async with session.post(url, headers=headers, json=payload) as response: response_data = await response.json() logger.info(response_data) return response_data except aiohttp.ClientError as e: logger.error(f"Ошибка запроса: {e}") return None
Проверим:
run(register_user( email="alex@rambler.ru", phone_number="+79001234567", first_name="Alex", last_name="Ivanov", password="securePassword123", confirm_password="securePassword123" ))
Теперь напишем функцию для авторизации в системе:
async def login_user(email: str, password: str): """Выполняет POST-запрос для авторизации пользователя.""" url = f"{BASE_SITE}/{TAG_AUTH}/login/" payload = { "email": email, "password": password, } async with aiohttp.ClientSession() as session: try: async with session.post(url, headers=headers, json=payload) as response: response_data = await response.json() if response_data: if response_data.get("ok"): logger.success(f"Access token: {response_data['access_token']}") else: logger.warning(f"Ошибка входа: {response_data.get('message')}") return response_data except aiohttp.ClientError as e: logger.error(f"Ошибка запроса: {e}") return None
Выполним с некорректными данными:
run(login_user( email="alex@rambler.ru", password="securePassword1232" ))
Ответ:
2024-11-20 12:13:22.114 | WARNING | __main__:login_user:66 - Ошибка входа
Теперь с корректными данными. Ответ:
2024-11-20 12:13:58.531 | SUCCESS | __main__:login_user:64 - Access token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiI3IiwiZXhwIjoxNzM0Njg2MDM4fQ.Tl_wF1cnTDxLslNkk5VJm_3-2xLcKUEIUX_odldKims
Давайте сохраним этот токен в переменную USER_TOKEN
. Теперь предлагаю выполнить вход с учетной записи, где есть права админа, и сохранить значение токена в переменную ADMIN_TOKEN
. Эти две переменные нам будут нужны на этапе тестирования методов, требующих авторизацию в системе.
Теперь напишем метод для получения информации по валютам от всех банков. Сейчас покажу вам, как можно использовать JWT-токен извне, передавая его в куки. По этой технологии работает большинство современных API (Bearer):
async def get_all_currency(access_token: str, method_name='all_currency'): """Выполняет запрос для получения всех валют.""" url = f"{BASE_SITE}/{TAG_API}/{method_name}/" headers['cookie'] = f"users_access_token={access_token}" async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=headers) as response: response_data = await response.json() logger.info(f"Статус: {response.status}") return response_data except aiohttp.ClientError as e: logger.error(f"HTTP request failed: {e}") return None
Данная функция принимает на вход токен пользователя и имя метода. Аргумент с именем метода я ввел по причине того, что получение информации по всем курсам банков и для пользователя, и для админа работает по одним правилам. Меняется только имя метода и токен.
Тут мы в словарь headers
добавляем ключ cookie
, а в его значение подставляем f"users_access_token={access_token}"
.
При проблемах с доступом (например, некорректный токен) система выдаст ошибку 401.
Давайте выполним запрос с корректным токеном, а полученный результат уже прогоним через цикл for:
rez = run(get_all_currency(access_token=USER_TOKEN)) for i in rez: logger.info(i)
Таким образом, нам удалось получить актуальные данные с нашего API, который сейчас размещен на серверах Amvera.
Теперь попробуем выполнить запрос для получения информации от админа, но с токеном пользователя:
rez = run(get_all_currency(access_token=USER_TOKEN, method_name='all_currency_admin'))
Тут я получаю сообщение «нет прав»:
{'detail': 'Недостаточно прав!'}
При замене на ADMIN_TOKEN
получаю нужную информацию.
По остальным методам не вижу смысла писать функции, так как общая логика должна быть понятна.
Заключение
В этой статье мы создали простой, но функциональный API с набором универсальных методов. Моя цель состояла не в том, чтобы разработать сложный API с уникальными функциями, а в том, чтобы помочь вам закрепить практические навыки работы с FastAPI, полученные в теоретических статьях.
Что мы изучили
-
Интеграцию APScheduler в проекты на FastAPI — инструмент, который часто применяется в реальных проектах
-
Практическое применение теоретических знаний FastAPI
-
Базовые принципы построения API
Что дальше?
В более сложных проектах вместо APScheduler обычно применяется Celery. Если будет достаточно интереса и отклика, в следующих статьях я планирую обсудить технологию Celery и Redis в контексте проектов FastApi.
Полезные ресурсы
Если вы хотите получать больше эксклюзивного контента, приглашаю вас в мой бесплатный Telegram-канал «Легкий путь в Python», где уже более 1400 единомышленников!
Если статья оказалась полезной, буду признателен за лайк и комментарий.
До новых встреч!
ссылка на оригинал статьи https://habr.com/ru/articles/859990/
Добавить комментарий