Следим за Telegram по-деловому

от автора

Приветствую!
Сегодня будем писать бота для хранения истории личных сообщений

Безусловно, идея не уникальная: часть людей уже использует неофициальные клиенты, другая – юзерботов (например, на pyrogram)

Что я предлагаю?

Официальный бизнес-режим бота, не нарушающий TOS, работающий 24/7 и за который не сносят аккаунты (!)

Минусы: Нужен Telegram Premium

Ближе к делу

Вот что мы получим по итогу

Вот что мы получим по итогу

Создание бота (Для новичков)

  1. Зайдите в бота @BotFather

  2. Напишите /newbot и как-нибудь обзовите бота, например «Уведомления»

  3. Придумайте ему @юзернейм, подходят только латинские буквы, цифры и нижнее подчеркивание, обязательно напишите «bot» в конце

  4. Бот готов – скопируйте и сохраните токен

  5. Настройте бота:

    1. Напишите /mybots и выберите своего бота

    2. Edit Bot -> Edit Botpic -> Отправьте аватарку для бота

    3. Back to Bot -> Bot Settings -> Business Mode -> Turn On

Написание бота

Стек: python 3.12, aiogram, redis, docker
(Ссылка на GitHub репозиторий внизу статьи)

Для начала придумываем файловую структуру, что-то типа:

. ├── main.py              # Запуск бота ├── pyproject.toml       # Управление зависимостями Poetry ├── Dockerfile           # Инструкции для сборки Docker образа ├── docker-compose.yml   # Конфигурация Docker контейнеров (бот + redis) ├── .env                 # Переменные окружения (токен, настройки) └── src/     ├── bot.py           # Основная логика бота, хендлеры     ├── keyboards.py     # Клавиатуры и коллбэки     └── settings.py      # Pydantic парсер .env файла

Теперь нужно установить библиотеки:

poetry add aiogram pydantic pydantic_settings loguru redis

pyproject.toml должен получится таким:

[tool.poetry] name = "business-bot" version = "0.1.0" description = "" authors = [] readme = "README.md"  [tool.poetry.dependencies] python = "^3.12" aiogram = "^3.13.1" loguru = "^0.7.2" redis = "^5.2.0" pydantic = "^2.9.2" pydantic-settings = "^2.6.0"  [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api"

Далее прописываем конфиг в settings.py, используя pydantic_settings, который парсит .env:

  • REDIS_HOST, REDIS_PORT, REDIS_PASSWORD по умолчанию устанавливаются в docker-compose.yml, но они нужны, если Вы будете запускать бота вне контейнера, используя локальный/облачный редис

from typing import Optional  from pydantic import SecretStr from pydantic_settings import BaseSettings, SettingsConfigDict   class Environment(BaseSettings):     model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")   class Redis(Environment):     REDIS_HOST: str = "localhost"     REDIS_PORT: int = 6379     REDIS_PASSWORD: Optional[SecretStr] = None   class Bot(Environment):     TOKEN: SecretStr = SecretStr("7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc")   class Settings(Bot, Redis):     USER_ID: int = 0   settings = Settings()

И соответственно настраиваем .env файл:

  • Токен бота ранее доставали из Bot Father

  • Свой ID можно получить с помощью @getmyid_bot

TOKEN="7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc" USER_ID=123456789

Прописываем переменные проекта в bot.py:

from aiogram import Bot, Dispatcher from redis.asyncio import Redis  from .settings import settings  bot = Bot(token=settings.TOKEN.get_secret_value()) dp = Dispatcher()  redis = Redis(     host=settings.REDIS_HOST,     port=settings.REDIS_PORT,     password=(         settings.REDIS_PASSWORD.get_secret_value()          if settings.REDIS_PASSWORD else None     ), )

и используем их в main.py для запуска:

from loguru import logger from src.bot import dp, bot  if __name__ == "__main__":     logger.info("Starting...")     dp.run_polling(         bot,         allowed_updates=[             "callback_query",             "business_message",             "edited_business_message",             "deleted_business_messages",         ],     ) 

Важно! aiogram по умолчанию слушает только следующие хэндлеры: message, callback_query, errors. Остальные он автоматически не парсит и их нужно добавлять вручную через allowed_updates.

В файле bot.py сохраняем все новые сообщения в Redis с авто-удалением через 3 недели:

EX_TIME = 60 * 60 * 24 * 21   async def set_message(message: types.Message):     await redis.set(         f"{message.chat.id}:{message.message_id}",         message.model_dump_json(),         ex=EX_TIME,     )   @dp.business_message() async def message(message: types.Message):     await set_message(message)

Теперь нужно как-то красиво сообщать об изменении статуса сообщений. Лучшим вариантом я считаю добавление inline клавиатуры, так не будет редактироваться исходное сообщение.
Первой кнопкой будет статус сообщения – ✏️ или 🗑️
Второй – ссылка на профиль отправителя вида tg://user?id=0
И третьей – кнопка для закрытия уведомления

Для этого прописываем markup и callbacks в файле keyboards.py:

from enum import StrEnum from aiogram.utils.keyboard import InlineKeyboardBuilder   class Callbacks(StrEnum):     EMPTY = "empty"     CLOSE = "close"   def link_markup(title: str, user_id: int):     builder = InlineKeyboardBuilder()     builder.button(text=title, callback_data=Callbacks.EMPTY)     builder.button(text="👤", url=f"tg://user?id={user_id}")     builder.button(text="❌", callback_data=Callbacks.CLOSE)     return builder.adjust(3).as_markup()

В примере выше я использую StrEnum для константных callback_data. На моей практике, импортировать класс с енамами и в следствии использовать их с F.data гораздо удобнее, чем в остальных вариантах, например, с глобальными переменными.

Отправляем уведомление при изменении сообщения (здесь создается копия сообщения на основе json строки из Redis):

@dp.edited_business_message() async def edited_message(message: types.Message):     model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")     await set_message(message)      if not model_dump:         return      original_message = types.Message.model_validate_json(model_dump)     if not original_message.from_user:         return      await original_message.send_copy(         chat_id=settings.USER_ID,         reply_markup=link_markup("✏️", original_message.from_user.id),     ).as_(bot)

И аналогично отправляем уведомления при удалении сообщений:

  • Одним конвейером (pipeline) достаем все удаленные сообщения по их ID и отправляем сохраненные копии

  • Если сообщений слишком много, мы можем столкнуться с flood wait от телеграма, но все удаленные сообщения должны гарантировано доставиться, поэтому дожидаемся его окончания

  • И в конце очищаем хранилище от удаленных сообщений

async def copy_message(message: types.Message):     await message.send_copy(         chat_id=settings.USER_ID,     ).as_(bot)   @dp.deleted_business_messages() async def deleted_message(business_messages: types.BusinessMessagesDeleted):     pipe = redis.pipeline()     for message_id in business_messages.message_ids:         pipe.get(f"{business_messages.chat.id}:{message_id}")     messages_data = await pipe.execute()      keys_to_delete = []     for message_id, model_dump in zip(business_messages.message_ids, messages_data):         if not model_dump:             continue          original_message = types.Message.model_validate_json(model_dump)         if not original_message.from_user:             continue          send_copy = original_message.send_copy(             chat_id=settings.USER_ID,             reply_markup=link_markup("🗑️", original_message.from_user.id),         ).as_(bot)          try:             await send_copy         except exceptions.TelegramRetryAfter as exp:             logger.warning(f"Retry after {exp.retry_after} seconds")             await asyncio.sleep(exp.retry_after + 0.1)             await send_copy         finally:             await asyncio.sleep(0.1)          keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")      if keys_to_delete:         await redis.delete(*keys_to_delete)

Еще нужно добавить примитивную логику для inline кнопок:

@dp.callback_query(F.data == Callbacks.EMPTY) async def empty(query: types.CallbackQuery):     await query.answer()   @dp.callback_query(F.data == Callbacks.CLOSE) async def close(query: types.CallbackQuery):     await query.answer()     if isinstance(query.message, types.Message):         await query.message.delete()

Полный файл bot.py:

import asyncio from aiogram import F, Bot, Dispatcher, types, exceptions from loguru import logger from redis.asyncio import Redis  from .settings import settings from .keyboards import link_markup, Callbacks  bot = Bot(token=settings.TOKEN.get_secret_value()) dp = Dispatcher()  redis = Redis(     host=settings.REDIS_HOST,     port=settings.REDIS_PORT,     password=(         settings.REDIS_PASSWORD.get_secret_value() if settings.REDIS_PASSWORD else None     ), )  EX_TIME = 60 * 60 * 24 * 21   async def set_message(message: types.Message):     await redis.set(         f"{message.chat.id}:{message.message_id}",         message.model_dump_json(),         ex=EX_TIME,     )   @dp.business_message() async def message(message: types.Message):     await set_message(message)   @dp.edited_business_message() async def edited_message(message: types.Message):     model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")     await set_message(message)      if not model_dump:         return      original_message = types.Message.model_validate_json(model_dump)     if not original_message.from_user:         return      await original_message.send_copy(         chat_id=settings.USER_ID,         reply_markup=link_markup("✏️", original_message.from_user.id),     ).as_(bot)   async def copy_message(message: types.Message):     await message.send_copy(         chat_id=settings.USER_ID,     ).as_(bot)   @dp.deleted_business_messages() async def deleted_message(business_messages: types.BusinessMessagesDeleted):     pipe = redis.pipeline()     for message_id in business_messages.message_ids:         pipe.get(f"{business_messages.chat.id}:{message_id}")     messages_data = await pipe.execute()      keys_to_delete = []     for message_id, model_dump in zip(business_messages.message_ids, messages_data):         if not model_dump:             continue          original_message = types.Message.model_validate_json(model_dump)         if not original_message.from_user:             continue          send_copy = original_message.send_copy(             chat_id=settings.USER_ID,             reply_markup=link_markup("🗑️", original_message.from_user.id),         ).as_(bot)          try:             await send_copy         except exceptions.TelegramRetryAfter as exp:             logger.warning(f"Retry after {exp.retry_after} seconds")             await asyncio.sleep(exp.retry_after + 0.1)             await send_copy         finally:             await asyncio.sleep(0.1)          keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")      if keys_to_delete:         await redis.delete(*keys_to_delete)   @dp.callback_query(F.data == Callbacks.EMPTY) async def empty(query: types.CallbackQuery):     await query.answer()   @dp.callback_query(F.data == Callbacks.CLOSE) async def close(query: types.CallbackQuery):     await query.answer()     if isinstance(query.message, types.Message):         await query.message.delete() 

Почти закончили!
Осталось обернуть все в контейнер:

Dockerfile

FROM python:3.12-slim  RUN pip install poetry  WORKDIR /app  # Копируем и устанавливаем зависимости COPY pyproject.toml poetry.lock ./ RUN poetry config virtualenvs.create false \   && poetry install --no-interaction --no-ansi --no-root  # Копируем файлы проекта и переменные окружения COPY . . COPY .env .env  EXPOSE 80  CMD ["python3", "main.py"]

docker-compose.yml

services:   redis:     image: redis:alpine     command: redis-server --requirepass ${REDIS_PASSWORD:-password} --save 60 1     healthcheck:       test: ["CMD", "redis-cli", "ping"]       interval: 5s       timeout: 3s       retries: 3     ports:       - "6380:6379"     volumes:       - redis_data:/data    web:     build: .     restart: unless-stopped     ports:       - "8081:80"     depends_on:       redis:         condition: service_healthy     environment:       REDIS_HOST: redis       REDIS_PORT: 6379       REDIS_PASSWORD: ${REDIS_PASSWORD:-password}  volumes:   redis_data:

Вот теперь готово!

Билдим бота и проверяем, что все работает:

docker-compose up --build

Исходный код

Это первая статья на Хабре, буду рад любой конструктивной критике!


ссылка на оригинал статьи https://habr.com/ru/articles/870868/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *