Создание анонимного чата в Telegram: Бот с MiniApp интерфейсом. Часть 1 — Бэкенд на FastAPI, Aiogram, Redis и Centrifugo

от автора

Друзья, приветствую!

Давно анонсировал большой проект, но из-за загруженности не доходили руки оформить все в статью. Как видите, руки наконец дошли.

Напомню, что из анонса следовала разработка телеграм-бота с MiniApp в формате случайного чата со случайным пользователем. Основная идея этой серии — рассказать и на конкретном, понятном примере показать, как работает связка таких технологий, как Centrifugo и Redis для реализации Real-time приложения.

Сейчас тема дейтинг-приложений и чат-апп набирает популярность, так почему бы не разобраться, как разработать подобное решение, которое может стать трендовым для рынка ТМА?

Информации получилось много, поэтому решил разбить описание данного проекта на две статьи:

  1. Бэкенд (логика приложения). Эту статью вы сейчас читаете.

  2. Фронтенд (MiniApp). Надеюсь, что эта статья выйдет не позже чем через неделю.

В сегодняшней статье, как следует из названия, мы сосредоточимся на разработке бэкенда — создании основной логики нашего приложения.

В следующей статье мы переключим внимание с бэкенда на фронтенд, а именно на разработку MiniApp с использованием VueJS 3.

Кроме того, в продолжениив я планирую рассмотреть вопрос монетизации ботов с MiniApp через решение от RichAds. Это позволит начать зарабатывать на своем приложении уже после запуска, благодаря активности пользователей.

Решение, предлагаемое сервисом RichAds, дает возможность интегрировать рекламные механики, которые не только обеспечивают доход, но и сохраняют удобство использования приложения для пользователей. Я подробно рассмотрю, как настроить монетизацию и оптимизировать ее для максимальной эффективности для данного проекта.

Описание проекта

Прежде чем начать рассматривать стек используемых технологий и подробно разбирать общий план работы, давайте напомню, какой проект мы разрабатываем.

Мы создаем телеграм-бота с MiniApp. Со стороны клиента проект будет выглядеть следующим образом:

  1. Клик в боте на кнопку «Старт»

  2. Ответ на пару вопросов от бота: ник в системе, возраст, пол

  3. После этого бот предоставляет клавиатуру с кнопками, открывающими возможности:

    • Просмотреть свой профиль (с возможностью внести правки)

    • Посмотреть окно «О нас»

    • Перейти в MiniApp при клике на соответствующую кнопку

Эти три пункта мы с вами сегодня реализуем полностью, а вопрос с MiniApp перенесем уже на следующую статью.

В самом MiniApp, на стартовой странице, у пользователя будет возможность задать интересующие его параметры для фильтрации, в частности, поиск в рамках диапазона возраста и пола, если требуется.

После будет запущена логика с поиском собеседника и затем, когда собеседник будет найден, откроется чат «Тет-а-тет».

Архитектура и технологии

Несмотря на кажущуюся простоту задачи, в проекте будет задействовано много интересных технологий, которые я постараюсь подробно и доступно объяснить в этой серии публикаций.

Фронтенд (MiniApp) и бэкенд (API, логика) в этом проекте будут реализованы как два независимых приложения:

  • Бэкенд: Язык программирования Python с использованием FastAPI, Aiogram, Aiogram dialog, SQLAlchemy и прочие инструменты

  • Фронтенд: JavaScript + HTML + CSS с фреймворком VueJS3

Монетизация проекта

Помимо технической части, в рамках данного проекта я хочу продемонстрировать вам эффективный способ монетизации своих приложений. В этом нам поможет сервис RichAds — современное решение для разработчиков Telegram-приложений.

RichAds предлагает действительно удобный инструмент, который позволяет не только эффективно монетизировать свои Telegram MiniApp, но и бюджетно привлекать целевую аудиторию в свои проекты. В следующей части я отдельно и более детально остановлюсь на вопросе интеграции сервиса RichAds в наше MiniApp, чтобы вы могли применить этот опыт в своих проектах.

План работы по бэкенду

Задача сегодняшней статьи заключается в том, чтобы уже к следующей части мы могли полностью сосредоточиться на вопросе фронтенда. В рамках сегодняшней статьи мы:

  • Поднимем Redis и Centrifugo (и разберемся, зачем нам эти технологии)

  • Опишем все необходимые API-методы

  • Опишем логику взаимодействия с базами данных (Redis и SQLite)

  • Разработаем логику бота (первые 3 пункта, описанные выше)

  • Выполним деплой готового бэкенда на сервис Amvera Cloud.

Стек используемых технологий

Сегодня мы будем писать на Python и задействуем следующие инструменты:

  • aiogram==3.18.0 — основной фреймворк для разработки телеграм-ботов на Python

  • aiogram_dialog==2.3.1 — вспомогательный фреймворк для удобной работы с машиной состояний

  • fastapi==0.115.0 — мощный фреймворк для разработки бэкенда на Python

  • SQLAlchemy==2.0.35 — ORM для работы с реляционными базами данных

  • redis==5.2.1 — официальная библиотека для работы с базой данных Redis

  • httpx==0.28.1 — для отправки внешних запросов

А также ряд дополнительных библиотек, о которых поговорим по мере необходимости.

Пошаговый план реализации

  1. Знакомство с Redis и Centrifugo: Коротко ознакомимся с тем, зачем нам в этом проекте нужны эти технологии

  2. Развертывание технологий: Я покажу вам как развернуть Redis и Centrifugo на локальной машине и расскажу о том, как эти технологии можно поднять буквально в пару кликов мышки на сервисе Amvera Cloud. Использование Amvera особенно удобно не только из-за простоты настройки, но и потому что Redis и Centrifugo становятся доступными извне — вы сможете с любой машины подключиться к этим сервисам.

  3. Подготовка проекта: Создадим токен бота в BotFather и подготовим базовую структуру проекта

  4. Работа с базами данных: Опишем полную логику взаимодействия с табличной базой данных через SQLAlchemy и с базой данных Redis через официальный клиент

  5. API-методы: Реализуем необходимые API-методы с помощью FastAPI (подробно разберем далее)

  6. Настройка вебхуков: Настроим телеграм-бота для работы через технологию вебхуков (в этом нам поможет FastAPI)

  7. Логика бота: Реализуем логику бота (пока без MiniApp): сценарий анкетирования, личный профиль пользователя, клавиатуры и прочее

  8. Тестирование и деплой: Протестируем и выполним деплой бэкенда. Для удобного деплоя снова воспользуемся сервисом Amvera Cloud. Этот сервис позволяет осуществить удаленный запуск бэкенда буквально за пару минут: перетягиваем файлы, прямо на сервисе заполняем настройки проекта и уже через пару минут бэкенд не просто доступен в сети, но и к нему привязано бесплатное доменное имя с HTTPS, которое не нужно будет отдельно настраивать.

Теоретическая часть

В следующем разделе мы рассмотрим немного теории перед тем, как перейти к практике. Мы разберем, почему именно Redis и Centrifugo — идеальные инструменты для нашего проекта, и каким образом они помогут нам создать быстрое и надежное real-time приложение.

Зачем нам Centrifugo

Centrifugo — это сервер для отправки событий реального времени клиентам через WebSocket, HTTP-streaming, SSE и другие механизмы. Он помогает разработчикам легко добавлять push-уведомления, чаты, обновления данных в реальном времени и другие динамические функции в приложения.

В своей статье «Centrifugo v6 + FastAPI + Python: разрабатываем веб-опросник с обновлениями в реальном времени» я более детально рассматривал, где используется данная технология. Сейчас же сосредоточимся на том, в чем технология будет полезна именно нам и как мы будем ее использовать.

Почему Centrifugo нужен для нашего проекта

Мы реализуем чат — это и есть «RealTime» приложение. То есть должна быть техническая возможность отправить сообщение и другая возможность получить это сообщение на той стороне для конечного получателя (чат).

Для реализации этой задачи есть два основных подхода:

1. Поллинг

В данном случае все бы работало так:

  • Пользователь пишет сообщение и нажимает на кнопку «отправить»

  • Запускается API-метод, который сохраняет это сообщение в базе данных

  • На стороне клиента должно происходить обновление для того, чтобы он увидел новые сообщения, адресованные ему. Обновление может быть как ручным (например, когда это часто бывает на сайтах — нажали на обновить страницу — увидели новую информацию), так и автоматическим (Ajax).

В данном контексте поллинг — это когда со стороны фронтенда отправляется запрос на получение данных с определенной периодичностью (например, каждые 5 секунд). Это создает высокую нагрузку на сервер, так как каждый клиент будет постоянно спрашивать: «Есть ли новые сообщения для меня?», независимо от того, появились новые сообщения или нет.

И представьте, если у нас будет не 10 пользователей, а 1000 или 10000? Все они будут постоянно отправлять запросы, и большинство из этих запросов будут «пустыми» (без новых данных).

2. RealTime (WebSocket)

В данном контексте необходимости обновлять страницу постоянно нет. Здесь все работает следующим образом:

  • Существует определенный канал, к которому подключаются пользователи (в нашем случае к одному каналу могут подключиться максимум 2 человека)

  • После подключения (подписки) любое отправленное сообщение пользователем публикуется в этот канал, и пользователь на обратной стороне (второй участник чата) получит сообщение автоматически.

Для реализации этого подхода нам и нужен Centrifugo.

Преимущества использования Centrifugo

Centrifugo имеет целый ряд преимуществ, которые делают его идеальным выбором для нашего проекта:

  1. Автоматическое управление каналами — Centrifugo умеет самостоятельно генерировать каналы по принципу: если ко мне постучались, то я сразу канал создам, а если никого нет в канале, то и канала больше не существует.

  2. Простота интеграции — Centrifugo предоставляет клиентские библиотеки для JavaScript, которые мы будем использовать в нашем MiniApp.

  3. Масштабируемость — Centrifugo может обрабатывать тысячи подключений одновременно, что делает его подходящим для проектов любого размера.

  4. Безопасность — Centrifugo поддерживает механизмы авторизации и аутентификации, что позволяет обеспечить защиту передаваемых данных.

  5. Поддержка разных транспортов — Помимо WebSocket, Centrifugo поддерживает HTTP-streaming, SSE и другие механизмы передачи данных, что обеспечивает совместимость с различными клиентами.

Как мы будем использовать Centrifugo в нашем проекте

В нашем проекте Centrifugo будет использоваться следующим образом:

  1. Когда пользователи находят друг друга через систему поиска, мы будем создавать для них уникальный канал в Centrifugo.

  2. Оба пользователя подписываются на этот канал через JavaScript-библиотеку Centrifugo в нашем MiniApp.

  3. Когда один из пользователей отправляет сообщение, оно публикуется в этот канал через наш бэкенд.

  4. Centrifugo доставляет сообщение всем подписчикам канала (в нашем случае — второму пользователю) в режиме реального времени.

  5. Когда один из пользователей покидает чат, мы можем отписать его от канала, и если в канале не останется подписчиков, Centrifugo автоматически удалит канал.

Таким образом, Centrifugo решает для нас сразу несколько задач:

  • Обеспечивает мгновенную доставку сообщений

  • Снижает нагрузку на наш бэкенд, так как не требуется постоянного опроса сервера

  • Упрощает управление каналами связи между пользователями

Теперь разберемся зачем нам Redis.

Зачем нам Redis

Redis – это не реляционная (не табличная) база данных. Хранит она, по своей сути, питоновские словари в формате ключ-значение и в качестве механизма хранения использует операционную память.

За счет формата хранения данных и за счет того, что информация хранится в оперативной памяти — работает эта база данных очень быстро и, как правило, используется для временного хранения данных. Вот несколько примеров где применяется эта база данных:

  • Хранение кэша — ускорение доступа к часто запрашиваемым данным

  • Хранение токенов авторизации, паролей, сессий пользователей

  • Хранение определенных очередей задач (в качестве брокера сообщений, например, как в APScheduler)

  • Реализация счетчиков и рейтингов в реальном времени (например, для отслеживания просмотров или лайков)

  • Геопространственные индексы для поиска ближайших объектов (например, в приложениях типа Uber)

  • Управление лимитами запросов (rate limiting) для API

  • Распределенные блокировки для синхронизации доступа к ресурсам

  • Аналитика реального времени для отслеживания активности пользователей

Как мы будем использовать Redis в нашем проекте

Несмотря на то, что Centrifugo может выполнить очень много работы, частично логику нам предстоит написать самостоятельно. В частности, такая задача:

Нам необходимо каким-то образом объединить двух человек в одном чате (в одном канале) и не просто объединить, а сделать это нужно с учетом того, что существуют фильтры по пользователям. То есть мы должны собрать данные для фильтрации (пол и диапазон возраста), а после уже запускать в канал только тех пользователей, которые соответствуют критериям фильтрации с учетом собственного пола и возраста.

Как раз с этим нам и поможет Redis.

Создание очереди ожидания

Первую задачу, которую мы закроем при помощи этой библиотеки будет — это создание очереди. То есть, условно, нам необходимо будет создать комнату на основании данных о пользователе и его запроса по фильтру.

То есть, в базе данных Redis должен будет появится ключ, значением которого будет информация о созданной комнате и информация по первому участнику, который находится в комнате и ждет, когда к нему кто-то появится.

Проверка существующих комнат

Следующая задача — это проверка. Нужно ли создавать новую комнату (канал) или уже есть тот, кто ждет. Redis нам поможет определить это и на основании данного определения либо будет сгенерирован новый ключ (новая комната), либо в существующую комнату присоединится второй пользователь (мы внесем изменения в значение существующего ключа).

Управление доступом к комнатам

Отсюда следует и автоматическое определение того, можно ли к комнате подключиться. Если в значении ключа уже есть два пользователя, то в комнату мы не пустим, даже если фильтры будут удовлетворять.

Дополнительные функции

И далее вытекают следующие простые задачи:

  • Проверка статуса комнаты (заполнена или ожидает подключение)

  • Удаление комнаты при завершении чата

  • Обновление информации о пользователях в комнате

  • Установка таймаута для комнат, чтобы они не висели вечно, если никто не подключается

Практическая часть

В рамках практической части по Redis я поделюсь с вами своими наработками, так что даже если вы раньше не работали с этой технологией, следуя коду и пояснениям, вы не только научитесь работать с этой базой данных, но и поймете, что это даже проще, чем работать с традиционными табличными базами данных.

Примерная структура данных в Redis для нашего проекта может выглядеть так:

room:1234 -> {     "status": "waiting",  # waiting или active     "created_at": 1645678901,     "expire_at": 1645679201,  # время жизни комнаты, если никто не подключится     "participants": {         "123456789": {  # id пользователя             "gender": "male",             "age": 25,             "filters": {                 "gender": "female",                 "age_min": 20,                 "age_max": 30             }         }     } } 

В рамках сегодняшней статьи я поделюсь только частью методов и опишу общий подход, а в моем телеграмм-канале «Легкий путь в Python» вы найдете полный класс со всеми методами и с полным подходом. Там же вы найдете полный исходный код фронтенда и бэкенда данного проекта, который, кстати, лежит там уже как пару недель.

Преимущества использования Redis в нашем проекте

  1. Скорость работы — Redis работает в оперативной памяти, что обеспечивает очень быстрый доступ к данным

  2. Атомарные операции — Redis поддерживает атомарные операции, что позволяет избежать проблем с конкурентным доступом

  3. Легкость масштабирования — Redis легко масштабируется при росте нагрузки

  4. Поддержка различных типов данных — Redis поддерживает не только строки, но и списки, множества, хеш-таблицы и другие типы данных

  5. Автоматическое истечение срока действия — Redis позволяет устанавливать время жизни для ключей, что идеально подходит для временных данных, таких как наши комнаты ожидания

Теперь, когда мы разобрались с тем, что такое Redis и Centrifugo и зачем нам они нужны — поднимем их.

Поднимаем Redis

В этом разделе мы рассмотрим, как настроить Redis для нашего проекта двумя способами: локально с помощью Docker и на удаленном сервере с использованием Amvera Cloud. Я рекомендую использовать второй вариант для продакшена, поскольку он обеспечивает доступность вашей базы данных извне и не требует сложной настройки.

Вариант 1: Локальная установка Redis через Docker

Для начала работы с Redis локально нам потребуется Docker. Это самый простой и универсальный способ, который работает на любой операционной системе.

Шаг 1: Установка Docker

Если у вас еще не установлен Docker, скачайте и установите Docker Desktop для вашей операционной системы:

  • Для Windows и Mac: Docker Desktop

  • Для Linux: выполните установку через пакетный менеджер вашего дистрибутива

Шаг 2: Запуск Redis-контейнера

После установки Docker выполните следующую команду в терминале:

docker run --name redis -d -p 6379:6379 redis 

Эта команда:

  • —name redis — дает имя «redis» нашему контейнеру

  • -d — запускает контейнер в фоновом режиме

  • -p 6379:6379 — перенаправляет порт 6379 контейнера на порт 6379 вашей машины

  • redis — использует официальный образ Redis из Docker Hub

Шаг 3: Проверка работы Redis

Чтобы убедиться, что Redis запущен, выполните:

docker ps 

Вы должны увидеть контейнер с именем «redis» в списке запущенных контейнеров.

Вариант 2: Разворачиваем Redis на Amvera Cloud

Для нашего проекта я выбрал именно этот вариант, так как он дает возможность доступа к Redis из любой точки мира и не требует настройки сетевой инфраструктуры.

Шаг 1: Регистрация на Amvera Cloud

Если у вас еще нет аккаунта, зарегистрируйтесь на Amvera Cloud. Новые пользователи получают бонус в размере 111 рублей на баланс аккаунта, что позволяет начать работу без первоначальных вложений.

Шаг 2: Создание проекта базы данных

После авторизации:

  1. Нажмите на кнопку «Создать проект»

  2. Укажите имя для вашей базы данных (например, «redis-chat»)

  3. Выберите тип проекта «База данных»

  4. Выберите тариф «Начальный» (для нашего проекта этого достаточно)

  5. Нажмите «Далее»

Шаг 3: Настройка Redis

На следующем экране:

  1. Выберите Redis из списка доступных баз данных

  2. Если вам нужна защита паролем, добавьте опцию —requirepass ВАШ_ПАРОЛЬ

  3. Нажмите «Завершить»

Шаг 4: Открытие внешнего доступа

Для доступа к Redis извне:

  • Перейдите в созданный проект

  • Откройте вкладку «Настройки»

  • В разделе «Домены» добавьте доменное имя

  • Сохраните настройки

После этого вы получите URL для подключения к вашей базе данных, который будет выглядеть примерно так: redisdb-yourname.db-msk0.amvera.tech

Проверка подключения к Redis

Давайте убедимся, что наша база данных доступна и работает корректно. Для этого напишем простой скрипт на Python:

import redis  # Параметры подключения REDIS_HOST = "redisdb-yourname.db-msk0.amvera.tech"  # Замените на ваш хост REDIS_PORT = 6379 REDIS_PASSWORD = "your_password"  # Укажите ваш пароль, если он установлен  import redis   def init_redis(     host: str = REDIS_HOST,     port: int = REDIS_PORT,     password: str = REDIS_PASSWORD,     ssl: bool = True, ) -> redis.Redis:     """Инициализация подключения к Redis"""     return redis.Redis(         host=host,         port=port,         password=password,         ssl=ssl,         ssl_cert_reqs="none",     )   def test_redis_connection(client: redis.Redis) -> str:     """Проверка подключения к Redis"""     try:         if client.ping():             client.set("test_key", "hello_world")             value = client.get("test_key")             return f"Подключение успешно! Тест: {value.decode('utf-8')}"         return "Не удалось подключиться к Redis"     except Exception as e:         return f"Ошибка подключения: {e}"   redis_client = init_redis() print(test_redis_connection(redis_client)) 

Установка библиотеки Redis для Python

Перед запуском скрипта установите библиотеку redis:

pip install redis 

Особенности подключения через SSL

При подключении к удаленному серверу Redis через Amvera Cloud необходимо использовать SSL-соединение. Однако могут возникнуть проблемы с проверкой сертификатов. Для отладки можно отключить проверку сертификатов (ssl_cert_reqs=»none»), но для продакшена рекомендуется настроить сертификаты корректно.

Установка и запуск Centrifugo V6

Далее рассмотрим процесс развертывания Centrifugo V6 с использованием Docker. Мы рассмотрим два варианта установки: на локальном компьютере и в облачном сервисе Amvera Cloud.

Локальная установка Centrifugo

Создаем директорию для проекта, в которой разместим два ключевых файла:

  1. config.json — конфигурационный файл Centrifugo

  2. Dockerfile — инструкции для сборки Docker-образа

Создание config.json

Сначала подготовим конфигурационный файл со следующим содержимым:

{   "client": {     "token": {       "hmac_secret_key": "super_client_key"     },     "allowed_origins": ["*"]   },   "http_api": {     "key": "super_api_key"   },   "channel": {     "without_namespace": {       "allow_subscribe_for_client": true     }   },   "admin": {     "enabled": true,     "password": "super_admin_password",     "secret": "super_admin_secret_key"   } } 

Принцип работы Centrifugo

Для понимания конфигурации полезно разобраться в основах работы Centrifugo:

  • Centrifugo функционирует на основе каналов, к которым клиенты могут подключаться и получать сообщения

  • Для подписки на каналы используется JWT-авторизация (токен генерируется с помощью hmac_secret_key)

  • Публикация сообщений осуществляется через API-запросы с передачей ключа из параметра http_api.key

  • Параметр allowed_origins: [«*»] разрешает подключения с любых доменов/IP (в производственной среде рекомендуется указывать конкретные доверенные источники)

  • Настройка allow_subscribe_for_client: true позволяет любому клиенту подписываться на любые каналы

  • Блок admin отвечает за настройку и активацию административной панели

Дополнительные параметры и настройки можно найти в официальной документации Centrifugo, а более подробный разбор самого Centrifugo вы найдете в моей статье «Centrifugo v6 + FastAPI + Python: разрабатываем веб-опросник с обновлениями в реальном времени» там, кроме всего прочего, я рассматривал более простой пример использования технологии Centrifugo, чем тот, который мы будем рассматривать в рамках этой и следующей статьи.

Создание Dockerfile

В той же директории создаем файл Dockerfile:

FROM centrifugo/centrifugo:v6  # Устанавливаем рабочую директорию WORKDIR /centrifugo  # Копируем конфигурационный файл COPY config.json ./config.json  # Открываем порт 8000 EXPOSE 8000  # Запускаем Centrifugo с указанной конфигурацией CMD ["centrifugo", "--config", "config.json"] 

Сборка и запуск контейнера

Выполняем сборку Docker-образа:

docker build -t my-centrifugo . 

Запускаем контейнер:

docker run -d -p 8000:8000 --name centrifugo my-centrifugo 

При успешном запуске админ-панель будет доступна по адресу http://localhost:8000/. Для входа используйте пароль, указанный в конфигурации.

При локальной разработке используйте следующие адреса:

  • URL для API-запросов: http://localhost:8000/api

  • URL для WebSocket подключений: ws://localhost:8000/connection/websocket

Развертывание Centrifugo в Amvera Cloud

Для облачного развертывания потребуются те же файлы config.json и Dockerfile. Процесс настройки:

  1. Зарегистрируйтесь в Amvera Cloud

  2. Нажмите «Создать проект»

  3. Выберите «Приложение»

  4. Укажите имя приложения и выберите тарифный план (рекомендуется не ниже «Начальный»)

  5. На вкладке загрузки данных выберите удобный способ (Git или через интерфейс)

  6. В настройках конфигурации выберите тип приложения Docker и укажите containerPort 8000

  7. После создания проекта перейдите на вкладку «Домены» и добавьте бесплатный домен Amvera

При успешном развертывании проект будет доступен по выделенному URL. Для облачной версии адреса имеют вид:

  • URL для API-запросов: https://ваш_домен/api

  • URL для WebSocket подключений: wss://ваш_домен/connection/websocket

Обратите внимание, что для защищенного соединения используется протокол wss вместо ws.

Для более подробной информации о настройках и использовании Centrifugo с FastAPI и Python рекомендую ознакомиться с моей недавней статьей «Centrifugo v6 + FastAPI + Python: разрабатываем веб-опросник с обновлениями в реальном времени».

Подготовка проекта

Начнем с подготовки проекта. Для этого откроем любимый IDE, создадим новый проект и активируем виртуальное окружение:

python -m venv venv source venv/bin/activate  # Для Linux и Mac venv\Scripts\activate    # Для Windows 

Установка зависимостей

Создадим файл requirements.txt и добавим в него список необходимых зависимостей:

aiogram==3.18.0 aiogram_dialog==2.3.1 fastapi==0.115.0 pydantic==2.9.2 uvicorn==0.31.0 pydantic_settings==2.5.2 aiosqlite==0.20.0 alembic==1.13.3 loguru==0.7.2 SQLAlchemy==2.0.35 pyjwt==2.10.1 redis==5.2.1 httpx==0.28.1 

Далее установим зависимости командой:

pip install -r requirements.txt 

Получение токена бота через BotFather

Для работы с Telegram-ботом потребуется токен. Получить его можно через @BotFather:

  1. Откройте Telegram и найдите бота @BotFather.

  2. Отправьте команду /newbot.

  3. Следуйте инструкциям: задайте имя бота и его username.

  4. После успешного создания BotFather выдаст вам токен. Сохраните его в .env файле.

Создание файла с переменными окружения

Создадим файл .env и добавим в него основные переменные окружения:

BOT_TOKEN=your_tg_bot_token ADMIN_IDS=[tg_id1, tg_id2] BASE_URL=backend_url FRONT_URL=frontend_url REDIS_PORT=6379 REDIS_PASSWORD=my_super_pass REDIS_SSL=1 REDIS_HOST=tetatetredis-yakvenalex.db-msk0.amvera.tech SECRET_KEY=secret_key CENTRIFUGO_API_KEY=centrifugo_api_key CENTRIFUGO_URL=centrifugo_url SOCKET_URL=socket_url 

Пример заполненного .env файла:

BOT_TOKEN=7855368652:ABH1n8N-BJonYfm0LIOGGMtRwrs1NwPzMKs ADMIN_IDS=[12334566,1234123] BASE_URL=https://87g03r-85-175-194-59.ru.tuna.am FRONT_URL=https://7i602e-85-175-194-59.ru.tuna.am REDIS_PORT=6379 REDIS_PASSWORD=my_super_pass REDIS_SSL=1 REDIS_HOST=tetatetredis-yakvenalex.db-msk0.amvera.tech SECRET_KEY=bbe7d157-a253-4094-9759-06a8236543f9 CENTRIFUGO_API_KEY=d7627bb6-2292-4911-82e1-615c0ed3eebb CENTRIFUGO_URL=https://mycentrifugo-yakvenalex.amvera.io/api SOCKET_URL=wss:///mycentrifugo-yakvenalex.amvera.io/connection/websocket 

Вот краткое описание переменных окружения:

  • BOT_TOKEN – токен вашего Telegram-бота, полученный через BotFather.

  • ADMIN_IDS – список ID администраторов бота (указываются через запятую).

  • BASE_URL – URL бэкенда вашего проекта (для этапа разработки можно запустить сервис Ngrok или Tuna на том же порту что и FastApi приложение, это необходимо для корректной работы вебхуков).

  • FRONT_URL – URL фронтенда (сейчас можно указать, например, http://127.0.0.1:8001, пока это ни на что не влияет).

  • REDIS_PORT – порт, на котором работает сервер Redis.

  • REDIS_PASSWORD – пароль для подключения к Redis.

  • REDIS_SSL – флаг использования SSL при подключении к Redis (1 – включено, 0 – выключено).

  • REDIS_HOST – хост (адрес) сервера Redis.

  • SECRET_KEY – секретный ключ для подписи данных (например, JWT-токенов).

  • CENTRIFUGO_API_KEY – API-ключ для взаимодействия с сервером Centrifugo (используется для WebSocket-подключений).

  • CENTRIFUGO_URL – URL API сервера Centrifugo.

  • SOCKET_URL – WebSocket URL для подключения к Centrifugo.

Эти переменные используются для конфигурации бота, взаимодействия с базами данных, API и внешними сервисами.

Краткий обзор используемых библиотек

  • aiogram — асинхронная библиотека для работы с Telegram Bot API.

  • aiogram_dialog — библиотека для упрощенного создания диалогов в aiogram.

  • fastapi — фреймворк для создания API с высокой производительностью.

  • pydantic — инструмент для работы с валидацией данных.

  • uvicorn — ASGI-сервер для запуска FastAPI-приложений.

  • SQLAlchemy — ORM для работы с базами данных.

  • redis — клиент для работы с Redis.

  • loguru — удобный логгер для Python.

  • httpx — клиент для асинхронных HTTP-запросов.

Теперь у нас подготовлена среда разработки, установлены все зависимости, получен токен бота и настроены переменные окружения. Следующим шагом можно приступать к реализации бота и API.

Организация проекта

Теперь давайте разработаем удобную структуру файлов и папок в нашем проекте для комфортной разработки.

На данный момент в корневой папке проекта у вас должны лежать файлы .env и requirements.txt. Теперь на одном уровне с этими файлами создаем папку app и сразу помещаем в нее файл config.py. Заполним его следующим образом:

import os from typing import List from loguru import logger from pydantic_settings import BaseSettings, SettingsConfigDict  class Settings(BaseSettings):     BOT_TOKEN: str     ADMIN_IDS: List[int]     FORMAT_LOG: str = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}"     LOG_ROTATION: str = "10 MB"     DB_URL: str = "sqlite+aiosqlite:///data/db.sqlite3"     DB_PATH: str = os.path.join(         os.path.dirname(os.path.abspath(__file__)), "..", "data", "db.sqlite3"     )     BASE_URL: str     REDIS_PORT: int     REDIS_PASSWORD: str     REDIS_HOST: str     FRONT_URL: str     SECRET_KEY: str     CENTRIFUGO_API_KEY: str     CENTRIFUGO_URL: str     SOCKET_URL: str     REDIS_SSL: bool      @property     def hook_url(self) -> str:         """Возвращает URL вебхука"""         return f"{self.BASE_URL}/webhook"      model_config = SettingsConfigDict(         env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")     )  # Получаем параметры для загрузки переменных среды settings = Settings()  log_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log.txt") logger.add(     log_file_path,     format=settings.FORMAT_LOG,     level="INFO",     rotation=settings.LOG_ROTATION, ) 

Используем библиотеку pydantic-settings для удобной работы с переменными окружения. Этот инструмент помогает загружать переменные из .env-файла и обеспечивает строгую типизацию параметров. Это особенно полезно для централизованного управления конфигурацией и уменьшения ошибок, связанных с отсутствующими или некорректными значениями.

Структура проекта

Теперь в корневой папке app создаем следующие директории:

  • api/ – здесь будут описаны API-методы бэкенда.

  • bot/ – логика работы Telegram-бота.

  • dao/ – взаимодействие с табличной базой данных.

  • redis_dao/ – логика работы с Redis.

Кроме того, создаем файл main.py в корне app. Это будет главный файл приложения, с которого будет осуществляться сборка и запуск FastAPI-приложения.

Теперь мы готовы к описанию логики для каждого нашего микросервиса. Уже в следующем разделе мы опишем Python-код взаимодействия с базой данных Redis.

Пишем логику взаимодействия с Redis

Работать мы будем в папке app/redis_dao. Сразу заполним структуру пустыми файлами:

  • custom_redis.py – Класс-наследник Redis, в котором мы расширим стандартные методы и добавим свою логику.

  • redis_client.py – Класс для управления подключением к Redis с поддержкой явного и автоматического управления соединением.

  • manager.py – Файл, где мы создаем объект клиента и описываем функцию-зависимость для FastAPI. В полном коде, который доступен в моем Telegram-канале, также будет декоратор для кэширования.

Описание класса CustomRedis

Начнем с файла custom_redis.py. Полный код этого класса можно найти в источнике, который я указал выше, а здесь рассмотрим ключевые моменты.

Импорты:

import json from redis.asyncio import Redis from loguru import logger from typing import Any, Callable, Awaitable 

Здесь важно отметить, что мы используем асинхронный клиент Redis из официальной библиотеки.

Реализация класса:

class CustomRedis(Redis):     """Расширенный класс Redis с дополнительными методами"""      async def delete_key(self, key: str):         """Удаляет ключ из Redis."""         await self.delete(key)         logger.info(f"Ключ {key} удален")      async def delete_keys_by_prefix(self, prefix: str):         """Удаляет все ключи, начинающиеся с указанного префикса."""         keys = await self.keys(prefix + '*')         if keys:             await self.delete(*keys)             logger.info(f"Удалены ключи, начинающиеся с {prefix}")      async def delete_all_keys(self):         """Очищает текущую базу данных Redis."""         await self.flushdb()         logger.info("Все ключи в текущей базе данных удалены") 

Этот подход позволяет нам использовать стандартные методы Redis, а также добавлять собственные. Таким образом, клиент остается гибким, но не теряет совместимости с оригинальным API Redis.

Управление подключением к Redis

Теперь создадим клиента для управления подключением. Этот код будет в redis_client.py.

from loguru import logger from typing import Optional from app.redis_dao.custom_redis import CustomRedis  class RedisClient:     """Класс для управления подключением к Redis с поддержкой явного и автоматического управления."""      def __init__(         self,         host: str,         port: int,         ssl_flag: bool,         ssl_cert_reqs: str = "none",         password: Optional[str] = None,         user: str = "default",     ):         self.host = host         self.port = port         self.password = password         self.ssl_flag = ssl_flag         self.user = user         self.ssl_cert_reqs = ssl_cert_reqs         self._client: Optional[CustomRedis] = None      async def connect(self):         """Создает и сохраняет подключение к Redis."""         if self._client is None:             try:                 self._client = CustomRedis(                     host=self.host,                     port=self.port,                     password=self.password,                     ssl=self.ssl_flag,                     username=self.user,                     ssl_cert_reqs=self.ssl_cert_reqs,                     retry_on_timeout=True,                     health_check_interval=30,                 )                 await self._client.ping()  # Проверяем подключение                 logger.info("Redis подключен успешно")             except Exception as e:                 logger.error(f"Ошибка подключения к Redis: {e}")                 raise      async def close(self):         """Закрывает подключение к Redis."""         if self._client:             await self._client.close()             self._client = None             logger.info("Redis соединение закрыто")      def get_client(self) -> CustomRedis:         """Возвращает объект клиента Redis."""         if self._client is None:             raise RuntimeError("Redis клиент не инициализирован. Проверьте lifespan.")         return self._client      async def __aenter__(self):         """Поддержка асинхронного контекстного менеджера."""         await self.connect()         return self      async def __aexit__(self, exc_type, exc_val, exc_tb):         """Автоматически закрывает подключение при выходе из контекста."""         await self.close() 

Разбираем код подробнее

Обычно при работе с базой данных мы подключаемся, выполняем операции, затем закрываем соединение.

Но в случае с Redis нам выгоднее открывать соединение один раз при старте приложения и закрывать его при завершении.

Если же требуется управлять соединением вручную (например, в скриптах или фоновых задачах), этот класс поддерживает async with, что позволяет автоматически открывать и закрывать соединение в нужный момент:

async with RedisClient(host="localhost", port=6379, ssl_flag=False) as redis:     client = redis.get_client()     await client.set("test", "value") 

Инициализация клиента в manager.py

Теперь создадим глобальный объект RedisClient, чтобы использовать его в FastAPI.

Импорты

from app.config import settings from app.redis_dao.redis_client import RedisClient from app.redis_dao.custom_redis import CustomRedis from functools import wraps from typing import Callable, Awaitable, Any from loguru import logger 

Инициализация клиента

redis_manager = RedisClient(     host=settings.REDIS_HOST,     port=settings.REDIS_PORT,     password=settings.REDIS_PASSWORD,     ssl_flag=settings.REDIS_SSL, ) 

Зависимость для FastAPI

async def get_redis() -> CustomRedis:     """Функция зависимости для получения клиента Redis"""     return redis_manager.get_client() 

Важно: Этот код подразумевает, что клиент уже запущен.
В жизненном цикле FastAPI при старте приложения нужно вызывать:

await redis_manager.connect() 

А при завершении:

await redis_manager.close() 

Это все, что нам нужно для настройки клиента Redis! Далее мы рассмотрим работу с SQLite и SQLAlchemy.

Логика взаимодействия с SQLite

Теперь нам нужно завершить работу с табличной базой данных SQLite. В данном проекте я решил не усложнять эту часть и ограничился всего одной таблицей в базе данных — таблицей пользователей.

Мы не будем сохранять сообщения, которые пользователи отправляют друг другу в чатах. Этим я не только упрощаю задачу, но и хочу наглядно продемонстрировать мощь Centrifugo, который полностью берет на себя работу с сообщениями. В реальных проектах, конечно, сообщения стоит сохранять.

Кроме того, в этом проекте я решил не использовать Alembic. Задачу по созданию таблиц в базе данных возьмет на себя чистый aiosqlite. В коде будет реализована простая функция, которая при запуске приложения FastAPI создаст базу данных с необходимой структурой. В других своих статьях я показывал, как эта задача решается через Alembic и SQLAlchemy, а сегодня, для разнообразия, покажу вам ещё один способ.

Организация проекта

В папке app/dao создадим следующие файлы:

  • base.py: базовый класс с универсальными методами взаимодействия с табличными базами данных (описываю его почти в каждой своей статье, поэтому сегодня пробежимся по нему максимально кратко).

  • create_db.py: метод для создания базы данных с таблицей.

  • dao.py: дочерний класс от base.py, который позволяет расширять методы класса base.py.

  • database_middleware.py: middleware для управления сессиями в рамках Telegram-бота.

  • database.py: инициализация движка и фабрики асинхронных сессий (этот код также переходит из статьи в статью).

  • fast_api_dep.py: две зависимости для FastAPI.

  • models.py: описание таблицы (модели).

Пусть вас не пугает обилие файлов. Кода в каждом (за исключением base.py) будет немного, но я решил не объединять всё в кучу для тех, кто будет развивать данный проект. Так удобнее масштабировать.

Начнем с файла database.py:

from sqlalchemy.orm import DeclarativeBase from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession from app.config import settings  engine = create_async_engine(url=settings.DB_URL) async_session_maker = async_sessionmaker(engine, class_=AsyncSession)  class Base(AsyncAttrs, DeclarativeBase):     __abstract__ = True 

От класса Base будут наследоваться все таблицы (модели) SQLAlchemy. async_session_maker будет использоваться для создания и управления сессиями с базой данных (в отличие от Redis, мы будем открывать и закрывать соединение каждый раз, что связано с особенностями реляционных баз данных — это помогает избежать конфликтов).

Файл models.py

В этом файле опишем структуру нашей будущей таблицы:

from sqlalchemy import BigInteger from app.dao.database import Base from sqlalchemy.orm import Mapped, mapped_column  class User(Base):     __tablename__ = "users"      id: Mapped[int] = mapped_column(BigInteger, primary_key=True)     username: Mapped[str | None]     first_name: Mapped[str | None]     last_name: Mapped[str | None]     nickname: Mapped[str]     gender: Mapped[str]     age: Mapped[int] 

Это простая таблица с несколькими колонками, описанная в декларативном стиле SQLAlchemy 2.

Файл create_db.py

Опишем функцию, которая создаст базу данных с таблицей users. Таблица будет соответствовать описанной выше модели.

import aiosqlite from app.config import settings   async def create_users_table():     async with aiosqlite.connect(settings.DB_PATH) as db:         await db.execute(             """         CREATE TABLE IF NOT EXISTS users (             id INTEGER PRIMARY KEY,             username TEXT,             first_name TEXT,             last_name TEXT,             nickname TEXT NOT NULL,             gender TEXT NOT NULL,             age INTEGER NOT NULL         );         """         )         await db.commit()

Это простая функция, которая создаст базу данных и указанную таблицу, если они не существовали. Её можно смело включать в жизненный цикл FastAPI приложения при запуске.

Файл database_middleware.py

Этот файл я подробно разбирал в прошлых статьях, поэтому приведу только код для понимания:

from typing import Callable, Dict, Any, Awaitable from aiogram import BaseMiddleware from aiogram.types import Message, CallbackQuery from app.dao.database import async_session_maker   class BaseDatabaseMiddleware(BaseMiddleware):     async def __call__(         self,         handler: Callable[[Message | CallbackQuery, Dict[str, Any]], Awaitable[Any]],         event: Message | CallbackQuery,         data: Dict[str, Any],     ) -> Any:         async with async_session_maker() as session:             self.set_session(data, session)             try:                 result = await handler(event, data)                 await self.after_handler(session)                 return result             except Exception as e:                 await session.rollback()                 raise e             finally:                 await session.close()      def set_session(self, data: Dict[str, Any], session) -> None:         """Метод для установки сессии в словарь данных."""         raise NotImplementedError("Этот метод должен быть реализован в подклассах.")      async def after_handler(self, session) -> None:         """Метод для выполнения действий после вызова хендлера (например, коммит)."""         pass  class DatabaseMiddlewareWithoutCommit(BaseDatabaseMiddleware):     def set_session(self, data: Dict[str, Any], session) -> None:         data["session_without_commit"] = session  class DatabaseMiddlewareWithCommit(BaseDatabaseMiddleware):     def set_session(self, data: Dict[str, Any], session) -> None:         data["session_with_commit"] = session      async def after_handler(self, session) -> None:         await session.commit() 

Если коротко, то этот код работает как зависимость в FastAPI, но в контексте Telegram-бота. Благодаря этому мы сможем удобно создавать сессии с базой данных и автоматически их закрывать (с фиксацией изменений или без, в зависимости от вызванного middleware).

Чтобы это работало, middleware необходимо предварительно зарегистрировать, чем мы займемся далее.

Файл fastapi_dao_dep.py

Подобную логику, но для FastAPI, мы опишем в этом файле. Вот полный код:

from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from app.dao.database import async_session_maker   async def get_session_with_commit() -> AsyncGenerator[AsyncSession, None]:     """Асинхронная сессия с автоматическим коммитом."""     async with async_session_maker() as session:         try:             yield session             await session.commit()         except Exception:             await session.rollback()             raise         finally:             await session.close()               async def get_session_without_commit() -> AsyncGenerator[AsyncSession, None]:     """Асинхронная сессия без автоматического коммита."""     async with async_session_maker() as session:         try:             yield session         except Exception:             await session.rollback()             raise         finally:             await session.close() 

Файл base.py

Теперь нам осталось реализовать методы для взаимодействия с базой данных. Все методы, которые нас будут интересовать, описаны в базовом классе BaseDAO. Приведу только тот код, который актуален для текущей статьи. Полный список методов вы найдете в исходном коде проекта.

from typing import List, TypeVar, Generic, Type from pydantic import BaseModel from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.future import select from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete, func from loguru import logger from sqlalchemy.ext.asyncio import AsyncSession  from app.dao.database import Base  T = TypeVar("T", bound=Base)  class BaseDAO(Generic[T]):     model: Type[T] = None      def __init__(self, session: AsyncSession):         self._session = session         if self.model is None:             raise ValueError("Модель должна быть указана в дочернем классе")      async def find_one_or_none_by_id(self, data_id: int):         try:             query = select(self.model).filter_by(id=data_id)             result = await self._session.execute(query)             record = result.scalar_one_or_none()             log_message = f"Запись {self.model.__name__} с ID {data_id} {'найдена' if record else 'не найдена'}."             logger.info(log_message)             return record         except SQLAlchemyError as e:             logger.error(f"Ошибка при поиске записи с ID {data_id}: {e}")             raise 

Файл dao.py

Далее мы создаем в файле dao.py дочерний класс, в который сразу пробросим модель:

from app.dao.base import BaseDAO from app.dao.models import User  class UserDAO(BaseDAO[User]):     model = User 

При создании экземпляра UserDAO(session) вы получаете доступ ко всем методам базового класса BaseDAO. Обратите внимание, что сессию необходимо передавать корректно на глобальном уровне (через middleware или зависимость). Это связано с тем, что базовый класс не управляет транзакциями самостоятельно — все автоматические коммиты происходят на уровне сессии.

На этом мы завершили реализацию взаимодействия с табличной базой данных. Впереди нас ждут два крупных блока разработки:

API-методы

В этом разделе мы создадим набор эндпоинтов (API-endpoints), через которые фронтенд будет взаимодействовать с нашими сервисами:

  • Centrifugo

  • Redis

  • SQLite

Логика Telegram-бота

Здесь мы реализуем:

  • Пользовательский диалог в Telegram-боте

  • Сбор информации о пользователе (никнейм, пол, возраст)

  • Дополнительную бизнес-логику

Примечание: Разработка самого MiniApp на VueJS3 будет описана в следующей части.

Давайте приступим к реализации API-методов.

Разработка API-методов

Сразу подготовим структуру под этот блок. В папке app создаем папку api внутри которой сделаем 3 файла:

  • router.py: тут мы будем описывать наши эндпоинты

  • schemas.py: опишем Pydantic-схемы для преобразования и валидации данных

  • utils.py: опишем необходимые утилиты, которые позволят нам работать с Centrifugo и Redis

Описание схем данных

Сразу опишем схемы (модели) Pydantic:

from pydantic import BaseModel  class SPartner(BaseModel):     id: int     age_from: int = 0     age_to: int = 999     gender: str = "any"  class SMessge(BaseModel):     sender: str     user_id: int     message: str 

Простые модели, которые позволят валидировать сообщения пользователей при публикации и информацию по партнеру.

Утилиты для работы с Centrifugo и Redis

Теперь перейдем к блоку с утилитами (файл api/utils.py). Там остановимся подробнее.

Импорты

Начнем с импортов:

import json  # Для работы с JSON-данными import time  # Для работы со временем и таймстампами import uuid  # Для генерации уникальных идентификаторов комнат from datetime import datetime  # Для работы с датой и временем from typing import List, Dict, Any  # Типизация данных from fastapi import HTTPException  # Обработка HTTP-ошибок from loguru import logger  # Логирование import jwt  # Работа с JWT-токенами import httpx  # HTTP-клиент для асинхронных запросов (будем отправлять запросы к Centrifugo) from app.config import settings  # Настройки приложения from app.dao.dao import UserDAO  # Доступ к БД пользователей from app.redis_dao.custom_redis import CustomRedis  # Работа с Redis 

Эти импорты предоставляют все необходимые инструменты для работы с данными, аутентификацией, HTTP-запросами и базами данных в нашем API.

Утилиты для взаимодействия с Centrifugo

Начнем с двух простых утилит для взаимодействия с Centrifugo. Первая утилита:

async def generate_client_token(user_id, secret_key):     # Устанавливаем время жизни токена (например, 60 минут)     exp = int(time.time()) + 60 * 60  # Время истечения в секундах      # Создаем полезную нагрузку токена     payload = {         "sub": str(user_id),  # Идентификатор пользователя         "exp": exp,  # Время истечения     }      # Генерируем токен с использованием HMAC SHA-256     return jwt.encode(payload, secret_key, algorithm="HS256") 

С помощью данной утилиты мы будем генерировать уникальный клиентский JWT-токен. Этот токен будет необходим клиенту для того, чтоб у него была возможность подписаться на канал Centrifugo.

Если коротко, то для того чтоб стать участником определенного канала недостаточно только указазывать идентификатор этого канала. Дополнительно необходимо передавать сгенерированный JWT-токен.

Сам токен генерируется на основании secret_key, который мы указывали ранее на этапе создания настроек для Centrifugo (client.token.hmac_secret_key). Также для генерации этого токена необходимо заполнить payload с обязательными параметрами sub и exp. В качестве sub (передаваемых данных для кодирования в токене) мы будем использовать телеграмм айди пользователя (обратите внимание, обязательно передавать значение строкой). Дата времени истечения передается целым числом.

Теперь опишем метод для публикации сообщений в канал Centrifugo. Метод будет простым и универсальным:

async def send_msg(data: dict, channel_name: str) -> bool:     # Сериализуем данные в JSON     json_data = json.dumps(data)     payload = {         "method": "publish",         "params": {"channel": channel_name, "data": json_data},     }     headers = {"X-API-Key": settings.CENTRIFUGO_API_KEY}     async with httpx.AsyncClient(timeout=90) as client:         response = await client.post(             url=settings.CENTRIFUGO_URL, json=payload, headers=headers         )         return response.status_code == 200 

Данный метод на вход принимает название канала (идентификатор) и питоновский словарь с некоторыми данными. Напоминаю, что если канал не существует, то он будет создан Centrifugo автоматически, как только к нему подключится пользователь, следовательно, на момент публикации канал уже будет существовать.

Метод специально упростил. В боевых проектах вместо питоновского словаря должна была идти модель Pydantic, а внутренняя логика метода может быть сложнее. Например, можно поставить фильтрацию на нецензурную лексику и прочее.

Данный метод делает следующее:

  1. Принимает некие данные и название канала

  2. Добавляет информацию в указанный канал

  3. Далее Centrifugo принимает эту публикацию и автоматически рассылает ее всем участникам канала

То есть, это действительно удобная и интуитивно понятная технология.

Утилиты для работы с Redis

Теперь мы можем приступить к описанию утилит для взаимодействия с Redis. Кода будет много, но он достаточно простой.

Начнем с метода, который будет генерировать комнату (канал):

async def create_new_room(     user_id: int,     user_nickname: str,     user_gender: str,     user_age: int,     find_gender: str,     age_from: int,     age_to: int,     redis_client: CustomRedis, ):     new_room_key = f"{find_gender}_{uuid.uuid4().hex[:10]}"     user_token = await generate_client_token(user_id, settings.SECRET_KEY)      new_room_data = {         "partners": [             {                 "id": user_id,                 "nickname": user_nickname,                 "gender": user_gender,                 "age": user_age,                 "find_gender": find_gender,                 "age_from": age_from,                 "age_to": age_to,                 "token": user_token,             }         ],         "created_at": datetime.now().isoformat(),         "room_key": new_room_key,     }      await redis_client.set(new_room_key, json.dumps(new_room_data))     return {         "status": "waiting",         "room_key": new_room_key,         "message": "Ожидаем подходящего партнера",         "token": user_token,         "sender": user_nickname,         "user_id": user_id,     } 

На вход он принимает: айди пользователя, его никнейм в системе, пол, возраст и его параметры для фильтрации. Далее, на основании этих данных, метод делает запись в базе данных Redis или, другими словами, мы создаем очередь.

Теперь, когда очередь уже создана, нам достаточно будет к ней присоединить другого пользователя. Сама работа не особо будет отличаться от работы с вложенным питоновским словарем.

Следующий логический метод, который вытекает из предыдущего — это добавление нового пользователя в существующую комнату:

async def add_user_to_room(     room,     user_id,     user_nickname,     user_gender,     user_age,     find_gender,     age_from,     age_to,     redis_client, ):     partners = room.get("partners", [])     new_user_token = await generate_client_token(user_id, settings.SECRET_KEY)     # Добавляем текущего пользователя в комнату     new_partner = {         "id": user_id,         "nickname": user_nickname,         "gender": user_gender,         "age": user_age,         "find_gender": find_gender,         "age_from": age_from,         "age_to": age_to,         "token": new_user_token,     }     partners.append(new_partner)      # Обновляем данные комнаты в Redis     room_key = room.get("room_key")     await redis_client.set(room_key, json.dumps(room))      # Возвращаем статус "matched"     return {         "status": "matched",         "room_key": room_key,         "message": "Партнер найден",         "token": new_user_token,         "sender": user_nickname,         "user_id": user_id,     } 

Тут важная строка — это статус, который возвращает данный метод. На статусе у нас будут завязаны проверки со стороны фронтенда.

Может быть ситуация, при которой пользователь попытается подключиться к комнате, в которой он уже есть, повторно. В связи с этим может быть полезным обновление токена пользователя и возврат специального статуса, который позволит ему вернуться в комнату:

async def refund_partner(     room_key, user_id, user_nickname, status="matched", message="Партнер найден" ):     new_user_token = await generate_client_token(user_id, settings.SECRET_KEY)     return {         "status": status,         "room_key": room_key,         "message": message,         "token": new_user_token,         "sender": user_nickname,         "user_id": user_id,     } 

Этот метод возвращает информацию о пользователе в чат-комнате. Он генерирует новый токен для пользователя и формирует ответ с данными о статусе комнаты, ключе комнаты, сообщении, токене и информации об отправителе. Используется для обновления состояния пользователя в системе чата. Применение метода будет более понятно далее, на этапе реализации API-метода.

Отдельно вынес метод для получения информации о пользователе, просто чтобы не загромождать код:

async def get_user_info(session, user_id):     full_user_data = await UserDAO(session).find_one_or_none_by_id(user_id)     if not full_user_data:         raise HTTPException(status_code=404, detail="Пользователь не найден")     return {         "nickname": full_user_data.nickname,         "gender": full_user_data.gender,         "age": full_user_data.age,     } 

Тут вы можете увидеть пример использования метода из класса BaseDAO. Метод на вход принимает айди пользователя и возвращает питоновский словарь только с необходимыми данными. Можно для этих целей было описать отдельный метод в классе UserDAO.

Опишем метод, который возвращает все данные по ключам. То есть он позволит получить всю информацию с Redis. Метод тоже учебный. В боевых проектах необходимо было бы добавить специальный префикс в пространство имен, который позволил бы отобразить исключительно значения ключей, имеющих отношение к комнатам:

async def get_all_rooms_gender(redis_client: CustomRedis) -> List[Dict[str, Any]]:     """     Возвращает все данные по ключам.      :param redis_client: Клиент Redis.     :return: Список словарей с данными комнат.     """     all_keys = await redis_client.keys()     rooms_data = []      if all_keys:         # Используем mget для получения значений по всем ключам одновременно         values = await redis_client.mget(all_keys)          for key, value in zip(all_keys, values):             if value:                 try:                     room_dict = json.loads(value)                     rooms_data.append(room_dict)                 except json.JSONDecodeError:                     logger.error(f"Ошибка декодирования JSON для ключа {key}")      return rooms_data 

Опишем метод, который проверяет, подходят ли пользователь и партнер друг другу по полу и возрасту:

def is_match(     user_gender: str,     user_find_gender: str,     user_age: int,     user_age_from: int,     user_age_to: int,     partner_gender: str,     partner_find_gender: str,     partner_age: int,     partner_age_from: int,     partner_age_to: int, ) -> bool:     """     Проверяет, подходят ли пользователь и партнер друг другу по полу и возрасту.      :param user_gender: Пол текущего пользователя.     :param user_find_gender: Пол, который ищет текущий пользователь.     :param user_age: Возраст текущего пользователя.     :param user_age_from: Минимальный возраст, который ищет текущий пользователь.     :param user_age_to: Максимальный возраст, который ищет текущий пользователь.     :param partner_gender: Пол партнера.     :param partner_find_gender: Пол, который ищет партнер.     :param partner_age: Возраст партнера.     :param partner_age_from: Минимальный возраст, который ищет партнер.     :param partner_age_to: Максимальный возраст, который ищет партнер.     :return: True, если пользователь и партнер подходят друг другу, иначе False.     """     # Проверка по полу     is_gender_match = (         partner_find_gender == "any" or partner_find_gender == user_gender     ) and (         user_find_gender == "any" or user_find_gender == partner_gender     )  # Текущий пользователь ищет партнера      # Проверка по возрасту     is_age_match = (         partner_age_from <= user_age <= partner_age_to     ) and (  # Возраст пользователя подходит партнеру         user_age_from <= partner_age <= user_age_to     )  # Возраст партнера подходит пользователю      # Возвращаем True, если оба условия выполнены     return is_gender_match and is_age_match 

Теперь, когда мы завершили работу с утилитами, давайте перейдем к следующему важному этапу — реализации API-методов в файле app/api/router.py.

Пишем API-методы

Теперь перейдем к реализации API-методов, которые будут использовать наши утилиты. Сначала выполним необходимые импорты:

import json from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from app.api.schemas import SPartner, SMessge from app.api.utils import (     send_msg,     create_new_room,     get_user_info,     get_all_rooms_gender,     add_user_to_room,     refund_partner,     is_match, ) from app.dao.fastapi_dao_dep import get_session_without_commit from app.redis_dao.custom_redis import CustomRedis from app.redis_dao.manager import get_redis 

Инициируем роутер:

router = APIRouter(prefix="/api", tags=["АПИ"]) 

Метод поиска партнера

Реализуем первый метод, который будет или создавать новую комнату или присоединять пользователя к существующей:

@router.post("/find-partner") async def find_partner(     user: SPartner,     session: AsyncSession = Depends(get_session_without_commit),     redis_client: CustomRedis = Depends(get_redis), ):     # Получаем полные данные пользователя     user_data = await get_user_info(session, user.id)      # Данные пользователя     user_nickname = user_data["nickname"]     user_gender = user_data["gender"]     user_age = user_data["age"]      # Данные для поиска     age_from = user.age_from     age_to = user.age_to     find_gender = user.gender      # Получаем все комнаты для искомого пола     all_rooms = await get_all_rooms_gender(redis_client)      if len(all_rooms) == 0:         # Если нет подходящих комнат, создаем новую         return await create_new_room(             user_id=user.id,             user_nickname=user_nickname,             user_gender=user_gender,             user_age=user_age,             find_gender=user.gender,             age_from=age_from,             age_to=age_to,             redis_client=redis_client,         )     else:         # Ищем подходящую комнату         for room in all_rooms:             partners = room.get("partners", [])             if len(partners) == 1:                 partner_data = partners[0]                 if partner_data["id"] != user.id:                     if is_match(                         user_gender=user_gender,                         user_find_gender=find_gender,                         user_age=user_age,                         user_age_from=age_from,                         user_age_to=age_to,                         partner_gender=partner_data.get("gender"),                         partner_find_gender=partner_data.get("find_gender"),                         partner_age=partner_data.get("age"),                         partner_age_from=partner_data.get("age_from"),                         partner_age_to=partner_data.get("age_to"),                     ):                         return await add_user_to_room(                             room,                             user.id,                             user_nickname,                             user_gender,                             user_age,                             find_gender,                             age_from,                             age_to,                             redis_client,                         )                 else:                     return await refund_partner(                         room.get("room_key"),                         user.id,                         user_nickname,                         status="waiting",                         message="Ожидаем подходящего партнера",                     )             elif len(partners) == 2:                 if partners[0]["id"] == user.id or partners[1]["id"] == user.id:                     return await refund_partner(                         room.get("room_key"), user.id, user_nickname                     )             continue         # Если подходящая комната не найдена, создаем новую         return await create_new_room(             user_id=user.id,             user_nickname=user_nickname,             user_gender=user_gender,             user_age=user_age,             find_gender=user.gender,             age_from=age_from,             age_to=age_to,             redis_client=redis_client,         ) 

В коде оставил комментарии, поэтому на разборе сильно зацикливаться не будем. Скажу только то, что данный метод будет вызван когда пользователь в MiniApp будет кликать на кнопку «Найти партнера». После клика фронтенд получит информацию о том нужно ли запускать окно ожидания, либо запустит пользователя в комнату.

Метод получения статуса комнаты

Следующий метод позволит вернуть статус комнаты:

@router.get("/room-status") async def room_status(     key: str, user_id: int, redis_client: CustomRedis = Depends(get_redis) ):     # Получаем данные о комнате из Redis     room_data = await redis_client.get(key)     if not room_data:         raise HTTPException(status_code=404, detail="Комната не найдена")      room_info = json.loads(room_data)     participants = room_info.get("partners", [])      # Если в комнате 2 участника, значит партнер найден     if len(participants) == 2:         # Находим партнера (не текущего пользователя)         partner = next(             (                 participant                 for participant in participants                 if participant["id"] != user_id             ),             None,         )         if not partner:             raise HTTPException(status_code=500, detail="Ошибка при поиске партнера")          return {             "status": "matched",             "room_key": key,             "partner": {"id": partner["id"], "nickname": partner["nickname"]},         }      # Если в комнате только один участник, значит ожидание     elif len(participants) == 1:         print(f"STATUS: waiting!")         return {             "room_key": key,             "status": "waiting",             "message": "Ожидаем подходящего партнера",         }      # Если комната пуста или участников больше 2, значит комната закрыта     else:         print(f"STATUS: closed!")         return {"room_key": key, "status": "closed", "message": "Комната закрыта"} 

И тут я хочу остановиться подробнее.

После того как пользователь нажмет на кнопку «Найти партнера» и после того как система определит, что пользователь находится в ожидании другого партнера, у нас появится необходимость как-то определить, что партнер найден, и затем перебросить его в чат с пользователем.

Тут может быть два подхода:

  1. Опрашивать статус комнаты например 1 раз в секунду на предмет изменения статуса (говоря нашим языком, ожидание статуса «matched»). То есть мы отправляем запрос с определенной периодичностью с фронтенда и когда меняется статус, выполняем действия. Это метод поллинга.

  2. Подписка пользователя на канал Centrifugo — когда пользователь получает уведомление о событии сразу, как только оно происходит.

Данную задачу я решил реализовать методом поллинга, просто для практической демонстрации отличия подходов. Саму Centrifugo мы протестируем и так в чате.

Метод удаления комнаты

Следующий простой метод позволит удалить комнату:

@router.post("/clear_room/{room_id}") async def clear_room(room_id: str, redis_client: CustomRedis = Depends(get_redis)):     # Асинхронно удаляем ключ, связанный с room_id     await redis_client.unlink(room_id)     return {"status": "ok", "message": f"Ключ для комнаты {room_id} удален"} 

Принимает айди комнаты и очищает значение.

Метод отправки сообщения

И опишем последний метод, который позволит выполнить публикацию сообщения в Centrifugo:

@router.post("/send-msg/{room_id}") async def vote(room_id: str, msg: SMessge):     data = msg.model_dump()     is_sent = await send_msg(data=data, channel_name=room_id)     return {"status": "ok" if is_sent else "failed"} 

В параметрах пути мы передаем айди комнаты (ключ в базе данных Redis) и передаем само сообщение. Тут мы уже подключили валидацию входящих данных через Pydantic.

В целом, это вся API-логика, которая необходима нашей системе. Теперь нам остается только описать логику в боте и после можно будет переходить к тестированию и деплою. Этим мы займемся уже в следующих разделах статьи.

Пишем логику Telegram бота

Начнем с создания структуры под разработку. Первым делом создаем папку app/bot и заполним ее следующим образом:

  • папка dialog: в ней опишем структуру анкетирования пользователя через Aiogram Dialog

  • папка user: тут мы опишем пользовательскую логику в боте (обработка профиля, старницы «о нас» и прочее)

  • create_bot.py: файл с настройками и инициализацией бота

  • schemas.py: Pydantic-схемы

  • kbs.py: клавиатуры бота

Начнем с основных файлов.

Файл schemas.py:

from pydantic import BaseModel, Field, ConfigDict from typing import Optional  class NickSchema(BaseModel):     nickname: str = Field(..., description="Никнейм пользователя")  class AgeSchema(BaseModel):     age: int = Field(..., ge=0, description="Возраст пользователя")  class UserIdSchema(BaseModel):     id: int = Field(..., description="Уникальный идентификатор пользователя")  class UserSchema(NickSchema, AgeSchema, UserIdSchema):     username: Optional[str] = Field(None, description="Имя пользователя")     first_name: Optional[str] = Field(None, description="Имя")     last_name: Optional[str] = Field(None, description="Фамилия")     gender: str = Field(..., description="Пол пользователя")     model_config = ConfigDict(from_attributes=True) 

Клавиатуры:

from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo from aiogram.utils.keyboard import InlineKeyboardBuilder from app.config import settings  def main_user_kb(user_id: int, sender: str) -> InlineKeyboardMarkup:     kb = InlineKeyboardBuilder()      kb.button(text="👤 Мой профиль", callback_data="my_profile")     kb.button(text="ℹ️ О нас", callback_data=f"about_us_{sender}")     url = f"{settings.FRONT_URL}?user_id={user_id}&sender={sender}"     kb.button(text="💬 Чат Тет-а-тет", web_app=WebAppInfo(url=url))      kb.adjust(1)     return kb.as_markup()  def profile_kb(user_id: int, sender: str):     kb = InlineKeyboardBuilder()      kb.button(text="Изменить никнейм", callback_data="edit_nickname")     kb.button(text="Изменить возраст", callback_data="edit_age")     kb.button(text="ℹ️ О нас", callback_data="about_us")     url = f"{settings.FRONT_URL}?user_id={user_id}&sender={sender}"     kb.button(text="💬 Чат Тет-а-тет", web_app=WebAppInfo(url=url))      kb.adjust(1)     return kb.as_markup() 

Тут обратите внимание как мы выбрасываем кнопку со ссылкой на MiniApp. Остальное — стандартные клавиатуры Aiogram 3.

Теперь поработаем с папкой dialog

Разработка диалога с пользователем через Aiogram Dialog

Подробно технологию Aiogram dialog я рассматривал в статье «Телеграм-бот для бронирования столов на вебхуках: FastAPI, Aiogram Dialog, FastStream и RabbitMQ в единой экосистеме». Далее я буду исходить из того, что вы ознакомились с данной статьей либо и так знакомы с Aiogram Dialog.

Сделаем в папке dialog следующую структуру файлов:

  • state.py: класс состояния дилога

  • getters.py: функции геттеры для окон Aiogram dialog

  • handlers.py: обработчики состояний

  • windows.py: окна диалога

  • dialog.py: инициализация диалога со сборщиком окон

Файл state.py:

from aiogram.fsm.state import StatesGroup, State  class FormState(StatesGroup):     nickname = State()     gender = State()     age = State()     confirmation = State() 

Тут мы описали состояния в рамках которых будет находится пользователь по ходу прохождения анкетирования (диалога).

Сразу опишем геттеры:

from aiogram_dialog import DialogManager  async def get_confirmed_data(dialog_manager: DialogManager, **kwargs):     """Получение заполненных пользователем данных."""     gender = dialog_manager.dialog_data["gender"]     dialog_manager.dialog_data["age"] = dialog_manager.find("age").get_value()     dialog_manager.dialog_data["nickname"] = dialog_manager.find("nickname").get_value()      gender_text = "мужской" if gender == "man" else "женский"      text = f""" <b>Пожалуйста, проверьте введенные данные:</b>  • 🏷 Никнейм: <i>{dialog_manager.dialog_data["nickname"]}</i> • 🎂 Возраст: <i>{dialog_manager.dialog_data["age"]} лет</i> • ⚧ Пол: <i>{gender_text}</i>  <b>Всё верно?</b> Если нет, вы можете вернуться назад и исправить информацию. """     return {"confirmed_text": text} 

Как видите, геттер у нас всего один будет и его суть в том, чтоб создать предфинальное сообщение, которое позволит пользователю подтвердить заполненные им данные.

Теперь опишем обработчики (файл handlers.py)

Начнем с импортов:

from typing import Any from aiogram.types import CallbackQuery, Message from aiogram_dialog import DialogManager from aiogram_dialog.widgets.kbd import Button from app.bot.kbs import main_user_kb from app.bot.schemas import UserSchema from app.dao.dao import UserDAO 

Сразу опишем обработчик, который будет вызываться при отмене заполнения анкеты:

async def cancel_logic(callback: CallbackQuery, button: Button, dialog_manager: DialogManager):     await callback.answer("Заполнение анкеты остановлено!")     await callback.message.answer("Вы отменили сценарий анкетирования. К сожалению, без этого действия вы не "                                   "сможете пользоваться ботом. Пожалуйста нажмите на /dialog и ответьте на пару "                                   "вопросов о себе.") 

Далее опишем ошибку, которая может возникнуть при указании возвраста:

async def error_age(message: Message, dialog_: Any, manager: DialogManager, error_: ValueError):     await message.answer("Возраст должен быть целым числом!") 

Тут мы применим подход о котором я не говорил в своей прошлой статье, а именно, подход с виджетом Aiogram Dialog – TextInput (ещё вернемся к методу на этапе описания окна).

Опишем функцию для установки в состоянии пола пользователя:

async def process_gender(callback: CallbackQuery, button: Button, dialog_manager: DialogManager):     await callback.answer("Пол установлен!")     dialog_manager.dialog_data["gender"] = button.widget_id     await dialog_manager.next() 

Опишем финальный обработчик для сохранения данных о пользователе из состояния:

async def on_confirmation(callback: CallbackQuery, button: Button, dialog_manager: DialogManager):     await callback.answer("Приступаю к сохранению")     session = dialog_manager.middleware_data.get("session_with_commit")     user_id = callback.from_user.id     user = UserSchema(id=user_id,                       username=callback.from_user.username,                       first_name=callback.from_user.first_name,                       last_name=callback.from_user.last_name,                       nickname=dialog_manager.dialog_data["nickname"],                       gender=dialog_manager.dialog_data["gender"],                       age=dialog_manager.dialog_data["age"])     await UserDAO(session).add(user)     text = "Спасибо, что ответили на все вопросы! Теперь вам доступен доступ к чату."     await callback.message.answer(text, reply_markup=main_user_kb(user_id, dialog_manager.dialog_data["nickname"]))     await dialog_manager.done() 

Тут обратите внимание на вызов мидлваря. К сожалению в формате работы с хендлерами Aiogram Dialog у нас нет возможность вызова мидлваря через атрибут функции. Поэтому приходится брать его через:

session = dialog_manager.middleware_data.get("session_with_commit") 

Теперь нам остается реализовать окна диалога и объединить их в одну сущность.

Работаем с файлом windows.py

Напоминаю, что окна в контексте Aiogram Dialog – это те сообщения с клавиатурой (или без), которые получает пользователь в рамках диалога. Окна состоят из виджетов.

Выполним импорты:

from aiogram_dialog import Window from aiogram_dialog.widgets.input import TextInput from aiogram_dialog.widgets.kbd import Next, Cancel, Group, Button, Back from aiogram_dialog.widgets.text import Const, Format from app.bot.dialog.getters import get_confirmed_data from app.bot.dialog.handlers import cancel_logic, error_age, process_gender, on_confirmation from app.bot.dialog.state import FormState 

Теперь опишем окно ввода никнейма пользователем:

def get_nickname_window() -> Window:     """Окно для ввода никнейма пользователя."""     return Window(         Const("Давайте знакомиться! Ответьте на пару вопросов о себе. Для начала укажите имя, "               "которое будут видеть пользователи чата Тет-А-Тет:"),         TextInput(             id="nickname",             on_success=Next(),             type_factory=str,         ),         Cancel(Const("Отмена"), on_click=cancel_logic),         state=FormState.nickname     ) 

Тут я продемонстрировал подход, который не рассматривал ранее. У нас получилось без конкретного хендлера сохранить введенные данные пользователем в состояние.

Далее рассмотрим более сложный пример использования этого виджета, но уже с валидацией:

def get_age_window():     return Window(         Const("Укажите свой возраст:"),         TextInput(             id="age",             on_error=error_age,             on_success=Next(),             type_factory=int,         ),         Back(Const("Назад")),         Cancel(Const("Отмена"), on_click=cancel_logic),         state=FormState.age,     ) 

Тут мы задействовали хендлер error_age. Согласитесь, это очень удобно, учитывая что нам не пришлось явно управлять состоянием, помещая в него возраст пользователя и его никнейм, это все за нас сделал виджет TextInput.

Обратите внимание на способ извлечения данных из состояния диалога:

gender = dialog_manager.dialog_data["gender"] age = dialog_manager.find("age").get_value() nickname = dialog_manager.find("nickname").get_value() 

То есть данные мы извлекаем не из dialog_data, как при явном помещении значения в состояние, а используя dialog_manager.find(КЛЮЧ).get_value()

Опишем окно выбора пола:

def get_gender_window():     return Window(         Const("Выберите пол"),         Group(             Button(text=Const("Мужской"), id="man", on_click=process_gender),             Button(text=Const("Женский"), id="woman", on_click=process_gender),             width=2         ),         Back(Const("Назад")),         Cancel(Const("Отмена"), on_click=cancel_logic),         state=FormState.gender     ) 

Тут мы задействовали клавиатуру (виджет Button) и к каждому клику привязали наш подготовленный хендлер захвата пола.

И последний обработчик, который будет отправлять окно с текстом подтвержения:

def get_confirmed_windows():     return Window(         Format("{confirmed_text}"),         Group(             Button(Const("Все верно"), id="confirm", on_click=on_confirmation),             Back(Const("Назад")),             Cancel(Const("Отмена"), on_click=cancel_logic),         ),         state=FormState.confirmation,         getter=get_confirmed_data     ) 

Тут мы уже задействовали наш подготовленный getter для того чтоб посчитать сообщение пользователю с его данными.

Теперь поработаем с файлом dialog.py.

Там нам необходимо будет объеденить все окна в одну сущность. Вот полный код:

from aiogram_dialog import Dialog from app.bot.dialog.windows import get_nickname_window, get_age_window, get_gender_window, get_confirmed_windows  form_dialog = Dialog(     get_nickname_window(),     get_age_window(),     get_gender_window(),     get_confirmed_windows() ) 

Теперь нам достаточно будет включить этот диалог как обычный роутер. Чем мы займемся в следующих разделах статьи.

Описание пользовательской логики

Теперь, когда мы подготовили наш диалог, нам необходимо включить его в общей пользовательской логике. Кроме того, далее мы опишем дополнительные методы, такие как работа с профилем пользователя или просмотр блока «О нас».

Работаем с папкой bot/user. Там мы создадим всего 2 файла:

  • state.py: состояния пользовательской части

  • router.py: хендлеры

Файл user/state.py:

from aiogram.fsm.state import StatesGroup, State  class AgeState(StatesGroup):     age = State()  class NickState(StatesGroup):     nickname = State() 

Два простых состояния, которые позволят пользователю изменить возраст или никнейм.

Теперь поработаем с файлом user/router.py

Выполним импорты:

from aiogram import Router, F from aiogram.filters import CommandStart from aiogram.fsm.context import FSMContext from aiogram.types import Message, CallbackQuery from aiogram_dialog import DialogManager, StartMode from sqlalchemy.ext.asyncio import AsyncSession from app.bot.dialog.state import FormState from app.bot.kbs import main_user_kb, profile_kb from app.bot.schemas import UserIdSchema, NickSchema, AgeSchema from app.dao.dao import UserDAO from app.bot.user.state import AgeState, NickState 

Инициируем роутер:

router = Router() 

Опишем обработчик, который будет вызываться при входе пользователя в бота (команда /start):

@router.message(CommandStart()) async def cmd_start(     message: Message,     dialog_manager: DialogManager,     session_without_commit: AsyncSession,     state: FSMContext, ):     await state.clear()     user_info = await UserDAO(session_without_commit).find_one_or_none_by_id(         message.from_user.id     )     if user_info is None:         await dialog_manager.start(FormState.nickname, mode=StartMode.RESET_STACK)     else:         await message.answer(             "Вам открыт доступ к чату! Для работы воспользуйтесь клавиатурой ниже.",             reply_markup=main_user_kb(                 user_id=message.from_user.id, sender=user_info.nickname             ),         ) 

Данный обработчик выполняет следующую логику:

  • очищает состояние (добавил специально, чтоб при клике на /start если пользователь был в состоянии заполнения никнейма или возраста состояние сбрасывалось)

  • проверяет есть ли пользователь в системе. Если пользователя нет, то мы выполняем запуск диалога, подготовленного ранее, иначе бот отправит пользователю заготовленное сообщение с клавиатурой.

Опишем обработчик «о нас»

@router.callback_query(F.data.startswith("about_us_")) async def cmd_about(call: CallbackQuery):     await call.answer()     user_id = call.from_user.id     nickname = call.data.replace("about_us_", "")     text = """ <b>Добро пожаловать в чат Тет-а-тет!</b>  и т.д. """     await call.message.edit_text(text, reply_markup=main_user_kb(user_id, nickname)) 

В полном исходнике кода можно будет посмотреть полный текст.

Теперь опишем логику при клике на мой профиль:

@router.callback_query(F.data == "my_profile") async def cmd_profile(call: CallbackQuery, session_without_commit: AsyncSession):     await call.answer()     user_info = await UserDAO(session_without_commit).find_one_or_none_by_id(         call.from_user.id     )      gender_text = "👨 Мужской" if user_info.gender == "man" else "👩 Женский"      profile_text = f""" <b>👤 Ваш профиль в Тет-а-тет:</b>  • <b>🏷 Никнейм:</b> {user_info.nickname} • <b>🎂 Возраст:</b> {user_info.age} лет / года • <b>⚧ Пол:</b> {gender_text} • <b>📛 Имя:</b> {user_info.first_name or 'Не указано'} • <b>👥 Фамилия:</b> {user_info.last_name or 'Не указана'} • <b>🆔 Имя пользователя:</b> {user_info.username or 'Не указано'}  ✏️ Чтобы изменить данные, воспользуйтесь клавиатурой ниже. """      await call.message.edit_text(         profile_text, reply_markup=profile_kb(call.from_user.id, user_info.nickname)     ) 

То есть мы выводим информацию о пользователе с клавиатурой смены данных. Далее я опишу только полную логику смены никнейма, так как смена возраста отличаться особо не будет.

@router.callback_query(F.data == "edit_nickname") async def cmd_about(call: CallbackQuery, state: FSMContext):     await call.answer()     await call.message.edit_text("Укажите новый никнейм: ")     await state.set_state(NickState.nickname)  @router.message(F.text, NickState.nickname) async def cmd_edit_nickname(     message: Message, state: FSMContext, session_with_commit: AsyncSession ):     user_dao = UserDAO(session_with_commit)     await user_dao.update(         filters=UserIdSchema(id=message.from_user.id),         values=NickSchema(nickname=message.text),     )     await state.clear()     await message.answer(         "Ваш никнейм изменен на: " + message.text,         reply_markup=main_user_kb(message.from_user.id, message.text),     ) 

Тут мы использовали классический подход к работе с машиной состояний от Aiogram 3.

В целом, на этом с логикой бота закончили. Теперь остается описать логику в файле create_bot.py и подготовить main-файл всего проекта, чем мы займемся далее.

Заполняем файл user/create_bot.py

Выполняем импорты:

from aiogram import Bot, Dispatcher from aiogram.client.default import DefaultBotProperties from aiogram.enums import ParseMode from aiogram.fsm.storage.memory import MemoryStorage from aiogram.types import BotCommand, BotCommandScopeDefault from aiogram_dialog import setup_dialogs from loguru import logger  from app.bot.dialog.dialog import form_dialog from app.bot.user.router import router as user_router from app.config import settings from app.dao.database_middleware import DatabaseMiddlewareWithoutCommit, DatabaseMiddlewareWithCommit from app.dao.create_db import create_users_table 

Инициализируем бота и диспетчер:

bot = Bot(token=settings.BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) dp = Dispatcher(storage=MemoryStorage()) 

Опишем командную клавиатуру:

async def set_commands():     commands = [BotCommand(command='start', description='Рестарт')]     await bot.set_my_commands(commands, BotCommandScopeDefault()) 

Опишем функцию, которая будет выполнена при запуске бота:

async def start_bot():     await create_users_table()     setup_dialogs(dp)     dp.update.middleware.register(DatabaseMiddlewareWithoutCommit())     dp.update.middleware.register(DatabaseMiddlewareWithCommit())     await set_commands()     dp.include_router(form_dialog)     dp.include_router(user_router)      for admin_id in settings.ADMIN_IDS:         try:             await bot.send_message(admin_id, f'Я запущен🥳.')         except:             pass     logger.info("Бот успешно запущен.") 

Остановимся подробнее на этой функции.

Эту функцию мы будем вызывать в жизненном цикле FastAPI приложения при его запуске. Сама функция выполняет следующие задачи:

  • Создает базу данных с таблицей, если они не существовали.

  • Инициализирует Aiogram Dialog.

  • Регистрирует мидлвари.

  • Устанавливает командную клавиатуру.

  • Подключает роутеры Aiogram Dialog и пользователя.

  • Рассылает всем администраторам сообщение о том, что бот запущен.

Теперь опишем логику, которая будет выполнена при завершении работы бота:

async def stop_bot():     try:         for admin_id in settings.ADMIN_IDS:             await bot.send_message(admin_id, 'Бот остановлен. За что?😔')     except:         pass     logger.error("Бот остановлен!") 

Эту функцию мы также зарегистрируем в жизненном цикле FastAPI, но уже при завершении работы приложения. Она просто рассылает сообщение всем администраторам о том, что бот остановлен.

На этом настройка бота завершена. Для завершения нашего бэкенда останется только настроить main-файл FastAPI приложения и запустить проект.

Работаем с main-файлом приложения

Создаем файл app/main.py. В нем мы опишем логику, которая позволит объединить весь разрозненный код (все микросервисы) в одну единую экосистему. В этом нам поможет жизненный цикл FastAPI.

Начнем с импортов:

from contextlib import asynccontextmanager from fastapi.middleware.cors import CORSMiddleware from app.bot.create_bot import dp, start_bot, bot, stop_bot from app.config import settings from aiogram.types import Update from fastapi import FastAPI, Request from loguru import logger from app.api.router import router as api_router from app.redis_dao.manager import redis_manager 

Теперь опишем саму функцию, которая будет управлять жизненным циклом нашего FastAPI приложения. Вот полный код:

@asynccontextmanager async def lifespan(app: FastAPI):     logger.info("Бот запущен...")     await redis_manager.connect()     await start_bot()     app.include_router(api_router)     webhook_url = settings.hook_url     await bot.set_webhook(url=webhook_url,                           allowed_updates=dp.resolve_used_update_types(),                           drop_pending_updates=True)     logger.success(f"Вебхук установлен: {webhook_url}")     yield     logger.info("Бот остановлен...")     await stop_bot()     await redis_manager.close() 

Жизненный цикл приложения

В рамках жизненного цикла приложения можно выделить три основных этапа:

  1. Запуск

  2. Работа приложения

  3. Завершение работы

1. Запуск

На этапе запуска мы выполняем следующую логику:

  • Подключение к Redis:
    Мы подключаемся к базе данных Redis, которая используется для хранения временных данных, кэширования или управления состоянием бота. Это важно для работы с сессиями пользователей или хранения промежуточных данных.

  • Запуск бота:
    Функция start_bot инициализирует Telegram-бота, активируя его основные функции. Это включает настройку обработчиков команд, сообщений и других событий, которые бот должен обрабатывать.

  • Подключение API-роутера:
    Мы подключаем API-роутер, который содержит все ранее описанные методы API. Это позволяет нашему приложению обрабатывать HTTP-запросы и взаимодействовать с другими частями системы.

  • Установка вебхука:
    Вебхук — это механизм, который позволяет Telegram отправлять обновления (например, новые сообщения или команды) на наш сервер. Мы указываем URL (webhook_url), на который Telegram будет отправлять эти обновления.
    Параметр allowed_updates определяет типы обновлений, которые бот будет получать (например, сообщения, команды, callback-запросы).
    Параметр drop_pending_updates=True указывает, что все обновления, которые произошли до установки вебхука, будут проигнорированы. Это полезно, чтобы избежать обработки старых данных при перезапуске бота.

2. Работа приложения

На этом этапе приложение активно обрабатывает запросы и обновления.ы

3. Завершение работы

На этапе завершения мы выполняем следующие действия:

  • Остановка бота:
    Функция stop_bot корректно завершает работу бота, освобождая ресурсы и завершая все активные процессы.

  • Отключение от Redis:
    Мы разрываем соединение с базой данных Redis, чтобы избежать утечек ресурсов и корректно завершить работу приложения.

Инициализация FastAPI приложения

Теперь инициализируем FastAPI приложение, передавая в него наш жизненный цикл:

app = FastAPI(lifespan=lifespan) 

Настройка CORS

Чтобы избежать проблем при отправке запросов к нашему API со стороны фронтенда, настраиваем CORS (Cross-Origin Resource Sharing):

app.add_middleware(     CORSMiddleware,     allow_origins=["*"],     allow_credentials=True,     allow_methods=["*"],     allow_headers=["*"] ) 

Обработка обновлений через вебхук

Теперь опишем эндпоинт /webhook, который будет обрабатывать обновления от Telegram-бота:

@app.post("/webhook") async def webhook(request: Request) -> None:     logger.info("Получен запрос с вебхука.")     try:         # Получаем данные обновления от Telegram         update_data = await request.json()          # Валидируем данные обновления с помощью модели Update         update = Update.model_validate(update_data, context={"bot": bot})          # Передаем обновление в диспетчер (dp) для дальнейшей обработки         await dp.feed_update(bot, update)          logger.info("Обновление успешно обработано.")     except Exception as e:         logger.error(f"Ошибка при обработке обновления с вебхука: {e}") 

Как это работает:

  1. Получение обновления:
    Telegram отправляет POST-запрос на наш эндпоинт /webhook с данными обновления (например, новое сообщение или команда). Эти данные приходят в формате JSON.

  2. Валидация данных:
    Мы используем модель Update из библиотеки aiogram, чтобы проверить корректность данных и преобразовать их в удобный для работы формат.

  3. Обработка обновления:
    Обновление передается в диспетчер (dp), который определяет, какой обработчик (handler) должен его обработать. Например, если пользователь отправил команду /start, диспетчер вызовет соответствующий обработчик для этой команды.

  4. Логирование:
    Мы логируем успешную обработку обновления или ошибку, если что-то пошло не так.

Запуск приложения

Остается только запустить приложение. Для этого в терминале выполним команду:

app.main:app –port 8000 –host 0.0.0.0 

Теперь наше приложение объединяет все микросервисы в единую экосистему. Оно корректно обрабатывает обновления от Telegram через вебхук, взаимодействует с базой данных Redis и предоставляет API для внешних запросов. Вебхук обеспечивает мгновенную доставку обновлений, что делает бота отзывчивым и эффективным в работе.

Завершающий этап: Деплой проекта на Amvera Cloud

Чтобы завершить разработку нашего проекта и сделать его доступным извне, выполним его деплой на облачный сервис Amvera Cloud. Этот процесс включает несколько шагов, которые мы подробно рассмотрим ниже.

Шаги для деплоя проекта

  1. Регистрация на Amvera Cloud
    Если у вас еще нет аккаунта на Amvera Cloud, зарегистрируйтесь на сайте. После регистрации перейдите в раздел «Приложения».

  2. Создание приложения
    Нажмите на кнопку «Создать приложение».

    • Укажите название вашего проекта.

    • Выберите подходящий тариф. Рекомендуется выбирать тариф не ниже «Начального».

    • Нажмите «Далее».

  3. Загрузка файлов проекта
    Вам будет предложено загрузить файлы проекта. Это можно сделать двумя способами:

    • Используя Git-команды (инструкция будет доступна на экране).

    • Загрузив файлы вручную через интерфейс. Важно: Виртуальное окружение Python загружать не нужно — Amvera создаст его автоматически. После загрузки нажмите «Далее».

  4. Настройка параметров
    Заполните необходимые поля в соответствии с инструкциями (см. скриншот ниже) и нажмите «Завершить».

Настройка доменного имени

Чтобы фронтенд мог взаимодействовать с бэкендом, необходимо привязать к проекту доменное имя с поддержкой HTTPS. Для этого:

  1. Перейдите в созданный проект и откройте вкладку «Домены».

  2. Нажмите «Добавить домен».

  3. Выберите опцию «HTTPS» и укажите бесплатное доменное имя Amvera (или привяжите собственное доменное имя, если оно у вас есть).

  4. Нажмите «Добавить».

Обновление локальных настроек

После привязки домена необходимо обновить файл .env на локальной машине:

  • Установите корректное доменное имя бэкенда в переменной BASE_URL.

  • Загрузите обновленный файл .env в репозиторий проекта на Amvera Cloud.

  • Нажмите «Пересобрать» и дождитесь завершения процесса (обычно это занимает несколько минут).

Завершение

Как только сборка завершится, вы получите уведомление от бота о том, что приложение запущено. На вкладке проекта появится зеленая плашка «Приложение запущено».

На этом процесс деплоя завершен. Ваш проект теперь доступен извне и готов к использованию!

Такой текст более структурирован, легко читается и содержит четкие инструкции. Если нужно что-то добавить или уточнить, дайте знать!

Что дальше?

На текущий момент мы прошли лишь часть пути: подготовили бэкенд нашего приложения. Теперь у нас есть функционал для генерации очередей и комнат в Redis, возможность публиковать сообщения и работать с каналами в Centrifugo, а также стартовая версия телеграм-бота. Но это только начало.

Впереди нас ждет реализация пользовательской части — фронтенда, или, иными словами, Telegram MiniApp. Кроме того, я обещал рассказать, как можно монетизировать подобные приложения с помощью сервиса RichAds. Этим мы займемся уже в следующей части. Постараюсь не затягивать и опубликовать ее не позднее конца следующей недели (на момент написания этого текста для меня это 23 марта 2025 года).

В следующей части мы:

  1. Разработаем дизайн страниц MiniApp и сверстаем их.

  2. Наполним их логикой фронтенда, используя возможности фреймворка Vue.js 3.

  3. Настроим работу Centrifugo на стороне фронтенда.

  4. Подключим к фронтенду API-методы, описанные сегодня.

  5. Интегрируем монетизацию проекта через сервис RichAds (это займет всего пару минут, и вам точно понравится).

  6. Завершим проект деплоем фронтенда на платформу Amvera Cloud.

В итоге мы получим полноценный FullStack-проект анонимного чата «Тет-а-Тет» в формате телеграм-бота с MiniApp.

Напоминаю, что полный исходный код бэкенда, который мы обсуждали сегодня, а также код фронтенда уже доступны в моем телеграм-канале «Легкий путь в Python» уже пару недель.

Если эта статья оказалась для вас полезной, не забудьте поддержать меня лайком, комментарием или подпиской. На разработку таких проектов и написание статей уходит очень много времени, а обратная связь от аудитории часто остается незаметной. Поэтому я искренне надеюсь на вашу поддержку.

Надеюсь, вы с нетерпением ждете продолжения. Если это так, примите участие в голосовании ниже.

На этом у меня все. До скорой встречи в следующей части!

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Будете ждать продолжение?

68.75% Конечно!11
0% Возможно…0
6.25% Ждать не буду, но почитаю1
25% Нет!4

Проголосовали 16 пользователей. Воздержался 1 пользователь.

ссылка на оригинал статьи https://habr.com/ru/articles/890976/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *