Создание собственного API на Python (FastAPI): Авторизация, Аутентификация и роли пользователей

от автора

Друзья, приветствую! К сожалению, не хватает времени, чтобы чаще публиковаться, и надеюсь, что вы ждали этой статьи.

Как вы поняли из названия, сегодня мы поговорим про авторизацию и аутентификацию. Прежде чем вы приступите к прочтению статьи, настоятельно рекомендую вам ознакомиться с прошлым материалом:

Без знаний, описанных там, вам вряд ли удастся разобраться, что к чему. Прежде чем мы приступим к написанию кода, давайте немного теории.

Изначально я планировал рассказать вам о работе с готовыми модулями для авторизации, но в процессе решил, что вам будет интереснее написать свою собственную авторизацию со своими полями, ролями и прочее.

Аутентификация

Аутентификация — это процесс проверки подлинности пользователя. Он подтверждает, что пользователь действительно тот, за кого себя выдает.

Когда вы вводите логин и пароль в форму авторизации, на самом деле происходит процесс аутентификации. Система проверяет, существует ли пользователь, корректный ли его пароль, и только если все условия выполнены, происходит аутентификация.

Авторизация

Авторизация — это процесс предоставления пользователю прав доступа к определенным ресурсам или действиям. Она определяет, что пользователь может и не может делать в системе.

Авторизация может быть и без аутентификации. К примеру, любая свободная доска объявлений: вы, не выполнив аутентификацию, можете читать объявления, а в некоторых случаях забирать контактные данные.

На том же примере: иногда доступ к контактам будет открыт только после входа в систему (после аутентификации), как и размещение объявлений.

Более серьезные темы работы с авторизацией включают доступ к админ-панели. В зависимости от вашего уровня доступа вам будет открыт тот или иной функционал или предоставлена определенная скрытая информация.

Для реализации обычно пользователям присваиваются определенные роли, и в зависимости от роли каждому предоставляются те или иные права. Кто-то может всё, например, админ, а кому-то можно только оставить комментарии (авторизованный пользователь).

Сегодня мы создадим такие роли и я покажу вам, как открывать (давать доступ) к данным или функционалу в зависимости от роли пользователя.

Связанные понятия

Вместе с этими темами в FastAPI тесно связаны следующие понятия:

Dependencies

Dependencies (зависимости) — это механизм FastAPI, который позволяет легко внедрять зависимости в маршруты. Это могут быть как простые значения, так и сложные объекты, такие как базы данных или другие сервисы.

Объясню проще: например, такой зависимостью может быть результат выполнения той или иной функции. К примеру, это проверка, авторизован ли пользователь. Тогда мы передадим аргументом Dependencies вместе с функцией, которая должна выполниться ранее. И только после успешного её выполнения откроется доступ к новой функции.

Примечательно, что зависимостей может быть несколько, и сегодня, на конкретных примерах, вы узнаете, как работать с этим механизмом.

Вместо долгих теоретических объяснений, мы вскоре познакомимся с зависимостями на конкретных примерах и опишем их в коде.

Hashing 

Hashing — процесс преобразования данных (например, пароля) в уникальный код фиксированной длины. В FastAPI часто используется для безопасного хранения паролей.

Сегодня мы будем захватывать пароль пользователя, после чего трансформируем его в hash и запишем в базу данных уже hash-строку. Также мы напишем метод, позволяющий проверять подлинность пароля. Он будет принимать пароль и hash и проверять, соответствуют ли они друг другу.

JWT токены

JWT токены: JSON Web Token (JWT) — это компактный, URL-ориентированный способ представления информации, которую можно использовать для передачи данных между двумя сторонами. В контексте авторизации JWT часто используется для создания токенов доступа, которые могут проверяться сервером. Сегодня мы научимся генерировать такие токены, помещать их в куки и доставать с них данные.

Будет рассмотрена только тема access_token (без refresh). Причин тут 2:

  1. Не хочу перегружать статью, а то снова получится на 30+ минут

  2. Последняя статья вообще не зашла из чего я сделал вывод, что тема FastApi особо вам не интересна, а значит тратить несколько дней на одну статью смысла нет.

Куки

Куки — это небольшие данные, которые сервер отправляет браузеру. Каждый раз, когда браузер запрашивает страницу с того же сервера, он отправляет куку обратно. Это позволяет сохранять состояние, например, оставаться залогиненным.

Сегодня я покажу вам как в куки поместить JWT acess_token и как его оттуда достать.

Теперь, когда у нас есть общее представление о теории, давайте приступим к написанию кода.

Подготовка

Для начала создадим отдельный роутер (папку) и назовем его users. В самой папке необходимо будет добавить несколько файлов. Структура должна получиться такой:

my_fastapi_project/  ├── tests/ │   └── (тут мы будем добавлять функции для Pytest) ├── app/ │   ├── database.py │   ├── config.py │   ├── main.py │   └── students/ │      ├── router.py │      ├── schemas.py │      ├── dao.py │      ├── rb.py │   └── users/ │      ├── router.py │      ├── dependencies.py │      ├── auth.py │      ├── schemas.py │      ├── dao.py │   └── dao/ │       └── base.py │   └── migration/ │       └── (файлы миграций Alembic) ├── alembic.ini ├── .env └── requirements.txt

То есть, новый роутер должен выглядеть так:

│   └── users/ │      ├── router.py │      ├── dependencies.py │      ├── auth.py │      ├── schemas.py │      ├── dao.py

Из нового у нас 2 файла: auth.py и dependencies.py. Для начала рассмотрим  auth.py:

В этот файл нам необходимо будет прописать следующие методы: 

  • метод для создания JWT access_token

  • метод для трансформации пароля в hash строку

  • метод для проверки соответствия пароля и hash-строки

  • метод который будет принимать Email (логин) и пароль от пользователя. Суть в том чтоб выполнить аутинтефикацию.

Давайте по порядку.

Необходимо установить следующие библиотеки:

python-jose bcrypt==4.0.1 passlib[bcrypt]

Важно, чтоб версия bcrypt была именно 4.0.1

pip install python-jose bcrypt==4.0.1 passlib[bcrypt]

python-jose:

Это библиотека для работы с JSON Web Tokens (JWT). Она поддерживает различные алгоритмы шифрования и подписания для создания и проверки JWT.

Используется для генерации, подписания и верификации токенов, которые часто используются для аутентификации и передачи информации между клиентом и сервером.

bcrypt==4.0.1:

Это библиотека для хэширования паролей с использованием алгоритма bcrypt.

Важно установить именно версию 4.0.1, чтобы избежать возможных несовместимостей или изменений в API в других версиях.

bcrypt считается одним из наиболее безопасных методов хэширования паролей благодаря использованию соли и многократного хэширования.

passlib[bcrypt]:

Это универсальная библиотека для хэширования паролей, которая поддерживает различные алгоритмы хэширования, включая bcrypt.

Passlib упрощает работу с хэшированием паролей, предоставляя удобные функции для создания и проверки хэшей.

Использование passlib[bcrypt] означает, что вы устанавливаете Passlib с поддержкой алгоритма bcrypt, обеспечивая дополнительную гибкость и функциональность.

Давайте напишем логику для работы с паролями

from passlib.context import CryptContext   pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")   def get_password_hash(password: str) -> str:     return pwd_context.hash(password)   def verify_password(plain_password: str, hashed_password: str) -> bool:     return pwd_context.verify(plain_password, hashed_password)

Импорт библиотеки:

from passlib.context import CryptContext

Импортируется CryptContext из библиотеки Passlib, который будет использоваться для настройки и управления хэшированием паролей.

Создание контекста для хэширования паролей:

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

CryptContext настраивается для использования алгоритма bcrypt.

Параметр deprecated=»auto» указывает использовать рекомендованные схемы хэширования и автоматически обновлять устаревшие.

Функция для создания хэша пароля:

def get_password_hash(password: str) -> str:     return pwd_context.hash(password)

Функция принимает пароль в виде строки и возвращает его безопасный хэш.

Функция для проверки пароля:

def verify_password(plain_password: str, hashed_password: str) -> bool:     return pwd_context.verify(plain_password, hashed_password)

Функция принимает обычный пароль и его хэш, возвращая True, если пароль соответствует хэшу, и False в противном случае.

Думаю что в этой части все понятно. Теперь напишем функцию для генерации JWT токена.

from jose import jwt from datetime import datetime, timedelta, timezone from app.config import get_auth_data   def create_access_token(data: dict) -> str:     to_encode = data.copy()     expire = datetime.now(timezone.utc) + timedelta(days=30)     to_encode.update({"exp": expire})     auth_data = get_auth_data()     encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])     return encode_jwt 

Давайте разберемся с get_auth_data. Тут мы просто должны добавить некоторые параметры в наш .env файл и после поместить эти данные в настройки.

.env

DB_HOST=localhost DB_PORT=5433 DB_NAME=fast_api DB_USER=amin DB_PASSWORD=my_super_password SECRET_KEY=gV64m9aIzFG4qpgVphvQbPQrtAO0nM-7YwwOvu0XPt5KJOjAy4AfgLkqJXYEt ALGORITHM=HS256

 Обратите внимание, что у нас появился SECRET и ALGORITHM.

 После правок файл с настройками будет выглядеть так:

import os from pydantic_settings import BaseSettings, SettingsConfigDict   class Settings(BaseSettings):     DB_HOST: str     DB_PORT: int     DB_NAME: str     DB_USER: str     DB_PASSWORD: str     SECRET_KEY: str     ALGORITHM: str      model_config = SettingsConfigDict(         env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")     )   settings = Settings()   def get_db_url():     return (         f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@"         f"{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"     )       def get_auth_data():     return {"secret_key": settings.SECRET_KEY, "algorithm": settings.ALGORITHM} 

Тут мы добавили метод, который будет возвращать эти два параметра. О том как оно все тут работает — я писал в прошлых статьях подробно. Теперь разберем сам код:

def create_access_token(data: dict) -> str:     to_encode = data.copy()     expire = datetime.now(timezone.utc) + timedelta(days=30)     to_encode.update({"exp": expire})     auth_data = get_auth_data()     encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])     return encode_jwt 

Функция create_access_token создает JSON Web Token (JWT) для аутентификации пользователей. Она принимает словарь с данными, добавляет время истечения токена (по умолчанию 30 дней), и затем кодирует эти данные в JWT с использованием секретного ключа и алгоритма шифрования, заданных в конфигурации приложения.

Далее, мы сможем поместить данный токен в куки, после чего начать его считывать. Поместить в него можно и другие данные. 

В результате считывания мы сможем понимать следующее:

  1. Истек ли токен. Если да, то мы можем автоматически «выбить» пользователя из системы

  2. Принимаем ID пользователя или прочие данные о нем для дальнейшей работы. Например, для предоставления пользователю того или иного функционала (данных).

В серьезных приложениях для того чтоб пользователя не выкидывало из системы реализуется функционал refresh_token. Если коротко, то refresh_token позволяет автоматически обновить срок жизни токена, тем самым, не заставляя пользователя повторно входить в систему.

Я понимаю, что сейчас не все ясно, но, совсем скоро, все станет на свои места.

Таблица пользователей.

Данные о пользователях, как и их права, нам нужно где-то хранить. Так давайте создадим соответствующую модель в файле users/models.py:

from sqlalchemy import text from sqlalchemy.orm import Mapped, mapped_column from app.database import Base, str_uniq, int_pk   class User(Base):     id: Mapped[int_pk]     phone_number: Mapped[str_uniq]     first_name: Mapped[str]     last_name: Mapped[str]     email: Mapped[str_uniq]     password: Mapped[str]      is_user: Mapped[bool] = mapped_column(default=True, server_default=text('true'), nullable=False)     is_student: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)     is_teacher: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)     is_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)     is_super_admin: Mapped[bool] = mapped_column(default=False, server_default=text('false'), nullable=False)      extend_existing = True      def __repr__(self):         return f"{self.__class__.__name__}(id={self.id})" 

В статье «Создание собственного API на Python (FastAPI): Router и асинхронные запросы в PostgreSQL (SQLAlchemy)» я подробно рассматривал тему создания таблиц (моделей) и работу с ними.

Тут мы описали нашу будущую таблицу. Из «необычного» тут есть следующие поля: is_user, is_student, is_teacher, is_admin, is_super_admin. Как вы понимаете, это ни что иное как роли пользователей.

По умолчанию мы присвоим каждому авторизованному пользователю роль is_user, но, в последствии, вы сможете изменить роль пользователю, тем самым, открывая ему доступ к новому функционалу или к особым данным.

Не забываем привязать новую таблицу в migration/env.py

from app.users.models import User

Выполняем миграцию (ревизию)

alembic revision --autogenerate -m "create users table"

Выполняем upgrade

alembic upgrade head

В результате у вас должна появиться таблица такого вида

Отлично! Теперь можем приступить к описанию самого роутера.

Нам необходим создать Router и напишем метод для регистрации пользователя.

from fastapi import APIRouter, HTTPException, status from app.users.auth import get_password_hash from app.users.dao import UsersDAO from app.users.schemas import SUserRegister   router = APIRouter(prefix='/auth', tags=['Auth'])   @router.post("/register/") async def register_user(user_data: SUserRegister) -> dict:     user = await UsersDAO.find_one_or_none(email=user_data.email)     if user:         raise HTTPException(             status_code=status.HTTP_409_CONFLICT,             detail='Пользователь уже существует'         )     user_dict = user_data.dict()     user_dict['password'] = get_password_hash(user_data.password)     await UsersDAO.add(**user_dict)     return {'message': 'Вы успешно зарегистрированы!'} 

Давайте разбираться.

Сразу мы видим, что я импортировал схему SuserRegister и новый класс UsersDAO. Если вы читали прошлые статьи по теме FastApi, то знаете, что SuserRegister, это, скорее всего Pydantic модель, описывающая тело post запроса, а UsersDAO — это класс для взаимодействия с таблицей пользователей.

Схема  SuserRegister:

from pydantic import BaseModel, EmailStr, Field, validator import re   class SUserRegister(BaseModel):     email: EmailStr = Field(..., description="Электронная почта")     password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")     phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")     first_name: str = Field(..., min_length=3, max_length=50, description="Имя, от 3 до 50 символов")     last_name: str = Field(..., min_length=3, max_length=50, description="Фамилия, от 3 до 50 символов")      @validator("phone_number")     @classmethod     def validate_phone_number(cls, value: str) -> str:         if not re.match(r'^\+\d{5,15}$', value):             raise ValueError('Номер телефона должен начинаться с "+" и содержать от 5 до 15 цифр')         return value 

Подобное мы уже писали. В данной теме мы сообщаем, что ждем мы email, password, phone_number, first_name и last_name. Каждое из полей обязательно для заполнения.

Файл users/dao.py

from app.dao.base import BaseDAO from app.users.models import User    class UsersDAO(BaseDAO):     model = User

Я решил не усложнять и просто наследовался от базового класса BaseDAO. То есть использовать мы будем только универсальные методы. Подробно об этом писал в прошлой статье.

Вернемся к обработчику POST запроса:

@router.post("/register/") async def register_user(user_data: SUserRegister) -> dict:     user = await UsersDAO.find_one_or_none(email=user_data.email)     if user:         raise HTTPException(status_code=status.HTTP_409_CONFLICT,                             detail='Пользователь уже существует')     user_dict = user_data.dict()     user_dict['password'] = get_password_hash(user_data.password)     await UsersDAO.add(**user_dict)     return {'message': f'Вы успешно зарегистрированы!'} 

Тут мы принимаем данные от пользователя после регистрации, затем делаем проверку на то существует ли он в базе данных. Если не существует и никаких ошибок валидации нет, то мы записываем пользователя в базу дынных.

По ошибкам (исключениям) я подготовлю в своем телеграмм канале небольшой эксклюзивный гайд. Покажу как вынести исключения в отдельный файл, какие исключения бывают и как с ними проще работать. Так же я расскажу и о том, зачим импортировать status из FastApi.

Предварительно, перед записью, мы трансформируем пароль в hash-строку. Вроде бы не сильно сложно, правда?

Регистрируем роутер в файле main.py:

from fastapi import FastAPI from app.students.router import router as router_students from app.majors.router import router as router_majors from app.users.router import router as router_users   app = FastAPI()   @app.get("/") def home_page():     return {"message": "Привет, Хабр!"}     app.include_router(router_users) app.include_router(router_students) app.include_router(router_majors) 

Давайте тестировать. Запускаем из корня проекта:

uvicorn app.main:app

 Попробуем передать данные о пользователе.

{   "email": "user@example.com",   "password": "super_password",   "phone_number": "+874493831",   "first_name": "Алексей",   "last_name": "Яковенко" }

Видим, что мы зарегистрированы. Проверим что там у нас в базе данных.

Видим что данные записаны, а пароль трансформирован в hash-строку. Неплохое начало!

Логично что следующий шаг это аутинтификация пользователя в базе данных.

И тут нам необходимо будет провернуть один трюк. Наша задача в том, чтоб после успешного входа в систему (аутинтефикация) мы сгенирировали JWT токен и поместили его в куки.

Для удобства давайте в файле auth.py напишем функцию, которая будет принимать Email и пароль, а дальше просто будет проверять есть ли такой пользователь и с таким паролем в базе данных. Без всяких там токенов и куки.

async def authenticate_user(email: EmailStr, password: str):     user = await UsersDAO.find_one_or_none(email=email)     if not user or verify_password(plain_password=password, hashed_password=user.password) is False:         return None     return user 

Тут мы попытались получить данные о пользователе по email. Затем, если пользователь с таким email получен, мы проверяем соответствует ли тот пароль что передал пользователь — hash-строке.

Если все условия выполнены — вернем данные о пользователе. Иначе вернем None.

Кроме того, давайте напишем Pydantic модель для авторизации пользователя:

class SUserAuth(BaseModel):     email: EmailStr = Field(..., description="Электронная почта")     password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков") 

Тем самым мы укажем, что в теле POST запроса мы ожидаем email и password.

Теперь можем написать и сам обработчик POST запроса:

@router.post("/login/") async def auth_user(response: Response, user_data: SUserAuth):     check = await authenticate_user(email=user_data.email, password=user_data.password)     if check is None:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,                             detail='Неверная почта или пароль')     access_token = create_access_token({"sub": str(check.id)})     response.set_cookie(key="users_access_token", value=access_token, httponly=True)     return {'access_token': access_token, 'refresh_token': None} 

Мы использовали тут новый параметр response.

response здесь представляет объект Response, который используется для управления HTTP-ответом, отправляемым клиенту. Он позволяет установить заголовки ответа, установить куки и так далее.

Далее мы просто выполняем проверку: получили мы пользователя или нет.

Если не получили, то вернем ошибку:

HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Неверная почта или пароль')

Тут я специально явно не указывал ошибка в почте или в пароле для безопасности и защиты от подборов. Злоумышленник не будет знать существует ли пользователь с указанным логином или нет.

Если данные о пользователе получены, то мы генерируем JWT токен, а затем записываем его в куку.

response.set_cookie(key="users_access_token", value=access_token, httponly=True)

Флаг httponly=True, установленный при установке куки с помощью метода response.set_cookie, указывает браузеру, что куки должны быть доступны только через HTTP или HTTPS, и не могут быть доступны скриптам JavaScript на стороне клиента.

Это повышает безопасность приложения, так как куки, содержащие чувствительные данные, такие как токены аутентификации (access_token), не могут быть скомпрометированы через атаки XSS (межсайтовый скриптинг).

Таким образом, флаг httponly=True помогает защитить данные пользователя от несанкционированного доступа и использования.

В целом все просто и сейчас, после авторизации, я вам покажу как будет выгядеть запись в куке.

После мы возвращаем {'access_token': access_token, 'refresh_token': None}.

Тестируем.

Намеренно допустим ошибку

Теперь укажем данные корректно:

Видим что получили ответ. Что там по кукам?

Для входа жмем F12. Переходим на вкладку "Aplications" (приложение) и смотрим на куки

Для входа жмем F12. Переходим на вкладку «Aplications» (приложение) и смотрим на куки

Мы видим, что в текущих куках появился ключ users_access_token с созданным токеном в значение. Отлично, ведь это то что нам было нужно!

Далее нам необходимо написать такой механизм, который позволит нам смотреть в куки и проверять, есть ли вообще ключ users_access_token, валиден ли этот ключ и так далее и тут начинается более сложная тема.

Dependencies на практике

В начале статьи я уже говорил о том что такое dependencies (зависимости), сейчас же мы познакомимся с ними на практике, но, перед этим, давайте напишем несколько сопутствующих функций.

Напишем функцию, которая позволит достать токен:

from fastapi import Request, HTTPException, status, Depends   def get_token(request: Request):     token = request.cookies.get('users_access_token')     if not token:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')     return token 

Смысл данного кода в том, чтоб достать значение ключа users_access_token из куки. Дополнительных проверок пока нет. Мы или получим некоторое значение или вызовем исключение, мол токен не найден.

А теперь давайте напишем функцию, которая будет зависеть от функции  get_token. Смысл зависимости тут будет сводится к тому, чтоб или выполнить функцию, если мы получим токен или сразу вызвать исключение.

Отправляю полный код:

from fastapi import Request, HTTPException, status, Depends from jose import jwt, JWTError from datetime import datetime, timezone from app.config import get_auth_data from app.exceptions import TokenExpiredException, NoJwtException, NoUserIdException, ForbiddenException from app.users.dao import UsersDAO   def get_token(request: Request):     token = request.cookies.get('users_access_token')     if not token:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Token not found')     return token     async def get_current_user(token: str = Depends(get_token)):     try:         auth_data = get_auth_data()         payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']])     except JWTError:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')      expire = payload.get('exp')     expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)     if (not expire) or (expire_time < datetime.now(timezone.utc)):         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')      user_id = payload.get('sub')     if not user_id:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')      user = await UsersDAO.find_one_or_none_by_id(int(user_id))     if not user:         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')      return user 

Декодер:

try:     auth_data = get_auth_data()     payload = jwt.decode(token, auth_data['secret_key'], algorithms=[auth_data['algorithm']]) except JWTError:     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен не валидный!')

В этом месте мы создали свой декодер. Его смысл в том, чтоб получить из токена данные с которыми можно будет работать. В нашем случае это exp и sub.

Теперь проверим срок токена (истек или не истек)

expire = payload.get('exp') expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc) if (not expire) or (expire_time < datetime.now(timezone.utc)):     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек') 

Тут нам необходимо привести данные по дате завершения срока жизни токена в питонвский формат.

Забираем значение:

expire: str = payload.get('exp')

Трансформируем в нужный формат:

expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)

Пишем простое условие:

if (not expire) or (expire_time < datetime.now(timezone.utc)):     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Токен истек')

Тут смысл в том, чтоб проверить есть ли параметр срока истечения жизни токена и не является ли эта дата больше текущей. Если одно из двух условий верное, то вызывается исключение.

Далее мы проверяем есть ли параметр ID пользователя:

user_id: str = payload.get('sub') if not user_id:     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Не найден ID пользователя')

Далее если параметр есть, то мы стараемся получить данные о пользователе:

user = await UsersDAO.find_one_or_none_by_id(int(user_id)) if not user:     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found') return user

Важно не забыть трансформировать  user_id в integer.

Вот и все сложности. Завершив проверку мы или исключение (ошибку) впоймаем или получим данные о пользователе.

Как вы понимаете, все сводится к тому, чтоб выйти из функции без исключения. Если это произошло — значит все у нас получилось.

Теперь напишем обработчик GET запроса, который будет возвращать данные о пользователе авторизованному пользователю и возвращать исключение, если пользователь не авторизован.

@router.get("/me/") async def get_me(user_data: User = Depends(get_current_user)):     return user_data

Обратите внимание, что тут мы указали зависимость от функции  get_current_user. В свою очередь  get_current_user зависит от get_token. На этом простом примере вы видите как можно выстраивать многоуровневые зависимости.

И, прежде чем мы приступим к тестам, давайте напишем простой обработчик, который будет выбивать пользователя из системы. Как вы понимаете, для этого нам достаточно будет удалить JWT токен из куки.

@router.post("/logout/") async def logout_user(response: Response):     response.delete_cookie(key="users_access_token")     return {'message': 'Пользователь успешно вышел из системы'}

Тестируем.

Выходим из системы

Пробуем получить о себе данные:

Получаем ожидаемую ошибку. Выполним повторную аутинтификацию.

Повторно пробуем получить о себе данные.

Видим что данные успешно получены.

А теперь давайте напишем метод, который будет доступен только для пользователя с ролью администратора (is_admin = True). Для этого в файле dependensies.py напишем новую функцию.

async def get_current_admin_user(current_user: User = Depends(get_current_user)):     if current_user.is_admin:         return current_user     raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Недостаточно прав!')

И теперь, если вы внимательно читали, то вы понимаете общую логику работы данного метода.

Напоминаю, что функция get_current_user, при успешном выполнении, возвращает данные о пользователе. В функции же get_current_admin_user мы проверяем наличие флага True в значении is_admin и только в том случае, если is_admin = True мы даем необходимые права пользователю на доступ к функционалу.

Напишем обработчик GET запроса, который вернет список из всех пользователей только администратору:

@router.get("/all_users/") async def get_all_users(user_data: User = Depends(get_current_admin_user)):     return await UsersDAO.find_all()

Так как у меня сейчас уровень доступа is_user – я получаю следующую ошибку:

Сейчас я «руками» изменю уровень доступа, а вам советую написать обработчик POST запроса, который позволит выполнить обновление роли и доступ дайте только администратору.

Повторно выполним запрос:

Отлично, данные получены!

Проблемы и новые вызовы

Начнем с того, что не совсем правильно руками заходить в базу данных и ставить True или False под той или иной ролью.

Дальше, роли могут появляться или исчезать.

Для решения данной проблемы я специально не писал код, а хочу порекомендовать вам сделать это самостоятельно. Задача будет выглядеть так:

  1. Создать эндпоинт для добавления / удаления ролей и сделать его доступным только админу. На входе метод пусть принимает название роли. Далее опишите DELETE и POST эндпоинты под конкретные задачи

  2. Создать эндпоинт для изменения роли пользователя. На входе он должен принимать роль и ее новое значение. Для тех кто прям хорошо разобрался в теме, предлагаю написать специальный метод в классе UserDao который будет скидывать все флажки на False и будет устанавливать флаг True той роли, которую вы назначили пользователю. Естественно, доступ к функционалу должен быть только у администратора

В телеграмм у меня, так же, есть собществ «Легкий путь в Python — сообщество» там мы обсуждаем, в том числе, проблемы с кодингом и вместе решаем их. Присоеденяйтесь!

И есть у нас одна новая, глобальная проблема — неудобно пользоваться обычным юзерам. Сейчас поясню.

Мы с вами программисты и работа через чистое API наша стихия, но вашим продуктом должны пользоваться обычные пользователи, бизнес, а для того чтоб это было комфортно — нужен приличный фронт (визуализация апи).

Нужны формы, анимации, кнопки и прочее и, как вы уже догадались, прикручивать фронт мы будем уже в следующей статье.

Заключение

Сегодня мы рассмотрели достаточно сложную тему и, несмотря, на то что я пытался описать тут все максимально подробно — без конкретной практики усвоить данный материал будет невозможно или очень сложно. Практикуйтесь!

Далее, хочу напомнить, что полный исходник данного кода вы найдете в моем телеграмм канале «Легкий путь в Python». Там же, я выложу отдельный гайд по работе с исключениями. Я покажу в нем, как их сохранять в переменные и переиспользовать в рамках одного или нескольких проектов.

На этом пока все. До скорых встреч!

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Поговорим о том как привязать фронт к FastApi без фроненд фреймворков?

42.86% Конечно3
0% Возможно0
57.14% Нет, не интересно!4

Проголосовали 7 пользователей. Воздержался 1 пользователь.

ссылка на оригинал статьи https://habr.com/ru/articles/829742/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *