Приветствую!
Сегодня будем писать бота для хранения истории личных сообщений
Безусловно, идея не уникальная: часть людей уже использует неофициальные клиенты, другая – юзерботов (например, на pyrogram)
Что я предлагаю?
Официальный бизнес-режим бота, не нарушающий TOS, работающий 24/7 и за который не сносят аккаунты (!)
Минусы: Нужен Telegram Premium
Ближе к делу
Создание бота (Для новичков)
-
Зайдите в бота @BotFather
-
Напишите /newbot и как-нибудь обзовите бота, например «Уведомления»
-
Придумайте ему @юзернейм, подходят только латинские буквы, цифры и нижнее подчеркивание, обязательно напишите «bot» в конце
-
Бот готов – скопируйте и сохраните токен
-
Настройте бота:
-
Напишите /mybots и выберите своего бота
-
Edit Bot -> Edit Botpic -> Отправьте аватарку для бота
-
Back to Bot -> Bot Settings -> Business Mode -> Turn On
-
Написание бота
Стек: python 3.12, aiogram, redis, docker
(Ссылка на GitHub репозиторий внизу статьи)
Для начала придумываем файловую структуру, что-то типа:
. ├── main.py # Запуск бота ├── pyproject.toml # Управление зависимостями Poetry ├── Dockerfile # Инструкции для сборки Docker образа ├── docker-compose.yml # Конфигурация Docker контейнеров (бот + redis) ├── .env # Переменные окружения (токен, настройки) └── src/ ├── bot.py # Основная логика бота, хендлеры ├── keyboards.py # Клавиатуры и коллбэки └── settings.py # Pydantic парсер .env файла
Теперь нужно установить библиотеки:
poetry add aiogram pydantic pydantic_settings loguru redis
pyproject.toml
должен получится таким:
[tool.poetry] name = "business-bot" version = "0.1.0" description = "" authors = [] readme = "README.md" [tool.poetry.dependencies] python = "^3.12" aiogram = "^3.13.1" loguru = "^0.7.2" redis = "^5.2.0" pydantic = "^2.9.2" pydantic-settings = "^2.6.0" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api"
Далее прописываем конфиг в settings.py
, используя pydantic_settings, который парсит .env
:
-
REDIS_HOST
,REDIS_PORT
,REDIS_PASSWORD
по умолчанию устанавливаются вdocker-compose.yml
, но они нужны, если Вы будете запускать бота вне контейнера, используя локальный/облачный редис
from typing import Optional from pydantic import SecretStr from pydantic_settings import BaseSettings, SettingsConfigDict class Environment(BaseSettings): model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") class Redis(Environment): REDIS_HOST: str = "localhost" REDIS_PORT: int = 6379 REDIS_PASSWORD: Optional[SecretStr] = None class Bot(Environment): TOKEN: SecretStr = SecretStr("7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc") class Settings(Bot, Redis): USER_ID: int = 0 settings = Settings()
И соответственно настраиваем .env
файл:
-
Токен бота ранее доставали из Bot Father
-
Свой ID можно получить с помощью @getmyid_bot
TOKEN="7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc" USER_ID=123456789
Прописываем переменные проекта в bot.py
:
from aiogram import Bot, Dispatcher from redis.asyncio import Redis from .settings import settings bot = Bot(token=settings.TOKEN.get_secret_value()) dp = Dispatcher() redis = Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=( settings.REDIS_PASSWORD.get_secret_value() if settings.REDIS_PASSWORD else None ), )
и используем их в main.py
для запуска:
from loguru import logger from src.bot import dp, bot if __name__ == "__main__": logger.info("Starting...") dp.run_polling( bot, allowed_updates=[ "callback_query", "business_message", "edited_business_message", "deleted_business_messages", ], )
Важно! aiogram по умолчанию слушает только следующие хэндлеры:
message
,callback_query
,errors
. Остальные он автоматически не парсит и их нужно добавлять вручную черезallowed_updates
.
В файле bot.py
сохраняем все новые сообщения в Redis с авто-удалением через 3 недели:
EX_TIME = 60 * 60 * 24 * 21 async def set_message(message: types.Message): await redis.set( f"{message.chat.id}:{message.message_id}", message.model_dump_json(), ex=EX_TIME, ) @dp.business_message() async def message(message: types.Message): await set_message(message)
Теперь нужно как-то красиво сообщать об изменении статуса сообщений. Лучшим вариантом я считаю добавление inline клавиатуры, так не будет редактироваться исходное сообщение.
Первой кнопкой будет статус сообщения – ✏️ или 🗑️
Второй – ссылка на профиль отправителя вида tg://user?id=0
И третьей – кнопка для закрытия уведомления
Для этого прописываем markup и callbacks в файле keyboards.py
:
from enum import StrEnum from aiogram.utils.keyboard import InlineKeyboardBuilder class Callbacks(StrEnum): EMPTY = "empty" CLOSE = "close" def link_markup(title: str, user_id: int): builder = InlineKeyboardBuilder() builder.button(text=title, callback_data=Callbacks.EMPTY) builder.button(text="👤", url=f"tg://user?id={user_id}") builder.button(text="❌", callback_data=Callbacks.CLOSE) return builder.adjust(3).as_markup()
В примере выше я использую
StrEnum
для константныхcallback_data
. На моей практике, импортировать класс с енамами и в следствии использовать их сF.data
гораздо удобнее, чем в остальных вариантах, например, с глобальными переменными.
Отправляем уведомление при изменении сообщения (здесь создается копия сообщения на основе json строки из Redis):
@dp.edited_business_message() async def edited_message(message: types.Message): model_dump = await redis.get(f"{message.chat.id}:{message.message_id}") await set_message(message) if not model_dump: return original_message = types.Message.model_validate_json(model_dump) if not original_message.from_user: return await original_message.send_copy( chat_id=settings.USER_ID, reply_markup=link_markup("✏️", original_message.from_user.id), ).as_(bot)
И аналогично отправляем уведомления при удалении сообщений:
-
Одним конвейером (pipeline) достаем все удаленные сообщения по их ID и отправляем сохраненные копии
-
Если сообщений слишком много, мы можем столкнуться с flood wait от телеграма, но все удаленные сообщения должны гарантировано доставиться, поэтому дожидаемся его окончания
-
И в конце очищаем хранилище от удаленных сообщений
async def copy_message(message: types.Message): await message.send_copy( chat_id=settings.USER_ID, ).as_(bot) @dp.deleted_business_messages() async def deleted_message(business_messages: types.BusinessMessagesDeleted): pipe = redis.pipeline() for message_id in business_messages.message_ids: pipe.get(f"{business_messages.chat.id}:{message_id}") messages_data = await pipe.execute() keys_to_delete = [] for message_id, model_dump in zip(business_messages.message_ids, messages_data): if not model_dump: continue original_message = types.Message.model_validate_json(model_dump) if not original_message.from_user: continue send_copy = original_message.send_copy( chat_id=settings.USER_ID, reply_markup=link_markup("🗑️", original_message.from_user.id), ).as_(bot) try: await send_copy except exceptions.TelegramRetryAfter as exp: logger.warning(f"Retry after {exp.retry_after} seconds") await asyncio.sleep(exp.retry_after + 0.1) await send_copy finally: await asyncio.sleep(0.1) keys_to_delete.append(f"{business_messages.chat.id}:{message_id}") if keys_to_delete: await redis.delete(*keys_to_delete)
Еще нужно добавить примитивную логику для inline кнопок:
@dp.callback_query(F.data == Callbacks.EMPTY) async def empty(query: types.CallbackQuery): await query.answer() @dp.callback_query(F.data == Callbacks.CLOSE) async def close(query: types.CallbackQuery): await query.answer() if isinstance(query.message, types.Message): await query.message.delete()
Полный файл bot.py
:
import asyncio from aiogram import F, Bot, Dispatcher, types, exceptions from loguru import logger from redis.asyncio import Redis from .settings import settings from .keyboards import link_markup, Callbacks bot = Bot(token=settings.TOKEN.get_secret_value()) dp = Dispatcher() redis = Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=( settings.REDIS_PASSWORD.get_secret_value() if settings.REDIS_PASSWORD else None ), ) EX_TIME = 60 * 60 * 24 * 21 async def set_message(message: types.Message): await redis.set( f"{message.chat.id}:{message.message_id}", message.model_dump_json(), ex=EX_TIME, ) @dp.business_message() async def message(message: types.Message): await set_message(message) @dp.edited_business_message() async def edited_message(message: types.Message): model_dump = await redis.get(f"{message.chat.id}:{message.message_id}") await set_message(message) if not model_dump: return original_message = types.Message.model_validate_json(model_dump) if not original_message.from_user: return await original_message.send_copy( chat_id=settings.USER_ID, reply_markup=link_markup("✏️", original_message.from_user.id), ).as_(bot) async def copy_message(message: types.Message): await message.send_copy( chat_id=settings.USER_ID, ).as_(bot) @dp.deleted_business_messages() async def deleted_message(business_messages: types.BusinessMessagesDeleted): pipe = redis.pipeline() for message_id in business_messages.message_ids: pipe.get(f"{business_messages.chat.id}:{message_id}") messages_data = await pipe.execute() keys_to_delete = [] for message_id, model_dump in zip(business_messages.message_ids, messages_data): if not model_dump: continue original_message = types.Message.model_validate_json(model_dump) if not original_message.from_user: continue send_copy = original_message.send_copy( chat_id=settings.USER_ID, reply_markup=link_markup("🗑️", original_message.from_user.id), ).as_(bot) try: await send_copy except exceptions.TelegramRetryAfter as exp: logger.warning(f"Retry after {exp.retry_after} seconds") await asyncio.sleep(exp.retry_after + 0.1) await send_copy finally: await asyncio.sleep(0.1) keys_to_delete.append(f"{business_messages.chat.id}:{message_id}") if keys_to_delete: await redis.delete(*keys_to_delete) @dp.callback_query(F.data == Callbacks.EMPTY) async def empty(query: types.CallbackQuery): await query.answer() @dp.callback_query(F.data == Callbacks.CLOSE) async def close(query: types.CallbackQuery): await query.answer() if isinstance(query.message, types.Message): await query.message.delete()
Почти закончили!
Осталось обернуть все в контейнер:
Dockerfile
FROM python:3.12-slim RUN pip install poetry WORKDIR /app # Копируем и устанавливаем зависимости COPY pyproject.toml poetry.lock ./ RUN poetry config virtualenvs.create false \ && poetry install --no-interaction --no-ansi --no-root # Копируем файлы проекта и переменные окружения COPY . . COPY .env .env EXPOSE 80 CMD ["python3", "main.py"]
docker-compose.yml
services: redis: image: redis:alpine command: redis-server --requirepass ${REDIS_PASSWORD:-password} --save 60 1 healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 3s retries: 3 ports: - "6380:6379" volumes: - redis_data:/data web: build: . restart: unless-stopped ports: - "8081:80" depends_on: redis: condition: service_healthy environment: REDIS_HOST: redis REDIS_PORT: 6379 REDIS_PASSWORD: ${REDIS_PASSWORD:-password} volumes: redis_data:
Вот теперь готово!
Билдим бота и проверяем, что все работает:
docker-compose up --build
Это первая статья на Хабре, буду рад любой конструктивной критике!
ссылка на оригинал статьи https://habr.com/ru/articles/870868/
Добавить комментарий